From ebd2e8acfbf3d17d3135e24764660c5e46a04abc Mon Sep 17 00:00:00 2001 From: "mjw@wray-m-3.hpl.hp.com" Date: Wed, 20 Apr 2005 08:45:19 +0000 Subject: [PATCH] bitkeeper revision 1.1327.2.1 (4266169fT9qqykPCJWENnvnqwf71kQ) Refactor domain construction and device controllers to remove use of twisted. Use threads to dispatch channel input. Move device controllers into domains. Signed-off-by: Mike Wray --- .rootkeys | 3 + tools/python/xen/lowlevel/xu/xu.c | 174 ++-- tools/python/xen/xend/XendConsole.py | 31 +- tools/python/xen/xend/XendDomain.py | 385 ++++---- tools/python/xen/xend/XendDomainInfo.py | 900 ++++++++----------- tools/python/xen/xend/scheduler.py | 41 + tools/python/xen/xend/server/SrvDaemon.py | 441 +-------- tools/python/xen/xend/server/SrvDomainDir.py | 10 +- tools/python/xen/xend/server/blkif.py | 572 ++++++------ tools/python/xen/xend/server/channel.py | 584 ++++++------ tools/python/xen/xend/server/console.py | 273 +++--- tools/python/xen/xend/server/controller.py | 818 ++++++----------- tools/python/xen/xend/server/event.py | 198 ++++ tools/python/xen/xend/server/messages.py | 42 +- tools/python/xen/xend/server/netif.py | 406 ++++----- tools/python/xen/xend/server/pciif.py | 59 ++ tools/python/xen/xend/server/usbif.py | 493 +++++----- 17 files changed, 2468 insertions(+), 2962 deletions(-) create mode 100644 tools/python/xen/xend/scheduler.py create mode 100644 tools/python/xen/xend/server/event.py create mode 100644 tools/python/xen/xend/server/pciif.py diff --git a/.rootkeys b/.rootkeys index 3d59bded88..06baad058b 100644 --- a/.rootkeys +++ b/.rootkeys @@ -902,6 +902,7 @@ 40c9c468xzANp6o2D_MeCYwNmOIUsQ tools/python/xen/xend/XendVnet.py 40c9c468x191zetrVlMnExfsQWHxIQ tools/python/xen/xend/__init__.py 40c9c468S2YnCEKmk4ey8XQIST7INg tools/python/xen/xend/encode.py +4266169ezWIlXSfY50n6HSoVFbosmw tools/python/xen/xend/scheduler.py 40c9c468DCpMe542varOolW1Xc68ew tools/python/xen/xend/server/SrvBase.py 40c9c468IxQabrKJSWs0aEjl-27mRQ tools/python/xen/xend/server/SrvConsole.py 40c9c4689Io5bxfbYIfRiUvsiLX0EQ tools/python/xen/xend/server/SrvConsoleDir.py @@ -924,9 +925,11 @@ 40c9c469hJ_IlatRne-9QEa0-wlquw tools/python/xen/xend/server/console.py 40c9c469UcNJh_NuLU0ytorM0Lk5Ow tools/python/xen/xend/server/controller.py 40d83983OXjt-y3HjSCcuoPp9rzvmw tools/python/xen/xend/server/domain.py +4266169exkN9o3hA8vxe8Er0BZv1Xw tools/python/xen/xend/server/event.py 40c9c469yrm31i60pGKslTi2Zgpotg tools/python/xen/xend/server/messages.py 40c9c46925x-Rjb0Cv2f1-l2jZrPYg tools/python/xen/xend/server/netif.py 40c9c469ZqILEQ8x6yWy0_51jopiCg tools/python/xen/xend/server/params.py +4266169eI_oX3YBjwaeC0V-THBRnjg tools/python/xen/xend/server/pciif.py 41ee5e8dq9NtihbL4nWKjuSLOhXPUg tools/python/xen/xend/server/usbif.py 40c9c469LNxLVizOUpOjEaTKKCm8Aw tools/python/xen/xend/sxp.py 4189125cL90jKSOcBJ3Vx4nWGiXXvA tools/python/xen/xend/util.py diff --git a/tools/python/xen/lowlevel/xu/xu.c b/tools/python/xen/lowlevel/xu/xu.c index 15f9b5db40..b9f8cee097 100644 --- a/tools/python/xen/lowlevel/xu/xu.c +++ b/tools/python/xen/lowlevel/xu/xu.c @@ -304,20 +304,6 @@ static PyObject *xu_notifier_read(PyObject *self, PyObject *args) return Py_None; } -/* this is now a NOOP */ -static PyObject *xu_notifier_unmask(PyObject *self, PyObject *args) -{ - Py_INCREF(Py_None); - return Py_None; -} - -/* this is now a NOOP */ -static PyObject *xu_notifier_bind(PyObject *self, PyObject *args) -{ - Py_INCREF(Py_None); - return Py_None; -} - static PyObject *xu_notifier_bind_virq(PyObject *self, PyObject *args, PyObject *kwds) { @@ -366,13 +352,6 @@ static PyObject *xu_notifier_virq_send(PyObject *self, return PyInt_FromLong(kmsg.u.virq.port); } -/* this is now a NOOP */ -static PyObject *xu_notifier_unbind(PyObject *self, PyObject *args) -{ - Py_INCREF(Py_None); - return Py_None; -} - static PyObject *xu_notifier_fileno(PyObject *self, PyObject *args) { return PyInt_FromLong(xcs_data_fd); @@ -384,21 +363,6 @@ static PyMethodDef xu_notifier_methods[] = { METH_VARARGS, "Read a @port with pending notifications.\n" }, - { "unmask", - (PyCFunction)xu_notifier_unmask, - METH_VARARGS, - "Unmask notifications for a @port.\n" }, - - { "bind", - (PyCFunction)xu_notifier_bind, - METH_VARARGS, - "Get notifications for a @port.\n" }, - - { "unbind", - (PyCFunction)xu_notifier_unbind, - METH_VARARGS, - "No longer get notifications for a @port.\n" }, - { "bind_virq", (PyCFunction)xu_notifier_bind_virq, METH_VARARGS | METH_KEYWORDS, @@ -1054,13 +1018,6 @@ typedef struct xu_port_object { static PyObject *port_error; -/* now a NOOP */ -static PyObject *xu_port_notify(PyObject *self, PyObject *args) -{ - Py_INCREF(Py_None); - return Py_None; -} - static PyObject *xu_port_read_request(PyObject *self, PyObject *args) { xu_port_object *xup = (xu_port_object *)self; @@ -1212,14 +1169,6 @@ static PyObject *xu_port_request_to_read(PyObject *self, PyObject *args) return PyInt_FromLong(found); } -static PyObject *xu_port_space_to_write_request(PyObject *self, PyObject *args) -{ - if ( !PyArg_ParseTuple(args, "") ) - return NULL; - - return PyInt_FromLong(1); -} - static PyObject *xu_port_response_to_read(PyObject *self, PyObject *args) { xu_port_object *xup = (xu_port_object *)self; @@ -1243,25 +1192,27 @@ static PyObject *xu_port_response_to_read(PyObject *self, PyObject *args) return PyInt_FromLong(found); } -static PyObject *xu_port_space_to_write_response( - PyObject *self, PyObject *args) +static void _xu_port_close(xu_port_object *xup ) { - if ( !PyArg_ParseTuple(args, "") ) - return NULL; - - return PyInt_FromLong(1); + if ( xup->connected && xup->remote_dom != 0 ) + { + xcs_msg_t kmsg; + kmsg.type = XCS_CIF_FREE_CC; + kmsg.u.interface.dom = xup->remote_dom; + kmsg.u.interface.local_port = xup->local_port; + kmsg.u.interface.remote_port = xup->remote_port; + xcs_ctrl_send(&kmsg); + xcs_ctrl_read(&kmsg); + xup->connected = 0; + } } -/* NOOP */ -static PyObject *xu_port_connect(PyObject *self, PyObject *args) +static PyObject *xu_port_close(PyObject *self, PyObject *args) { - Py_INCREF(Py_None); - return Py_None; -} + xu_port_object *xup = (xu_port_object *)self; + + _xu_port_close(xup); -/* NOOP */ -static PyObject *xu_port_disconnect(PyObject *self, PyObject *args) -{ Py_INCREF(Py_None); return Py_None; } @@ -1278,6 +1229,11 @@ static PyObject *xu_port_register(PyObject *self, PyObject *args, &type) ) return NULL; + if (!xup->connected) + { + return PyInt_FromLong(0); + } + msg.type = XCS_MSG_BIND; msg.u.bind.port = xup->local_port; msg.u.bind.type = type; @@ -1303,6 +1259,11 @@ static PyObject *xu_port_deregister(PyObject *self, PyObject *args, if ( !PyArg_ParseTupleAndKeywords(args, kwds, "i", kwd_list, &type) ) return NULL; + + if (!xup->connected) + { + return PyInt_FromLong(0); + } msg.type = XCS_MSG_UNBIND; msg.u.bind.port = xup->local_port; @@ -1319,10 +1280,6 @@ static PyObject *xu_port_deregister(PyObject *self, PyObject *args, } static PyMethodDef xu_port_methods[] = { - { "notify", - (PyCFunction)xu_port_notify, - METH_VARARGS, - "Send a notification to the remote end.\n" }, { "read_request", (PyCFunction)xu_port_read_request, @@ -1349,21 +1306,12 @@ static PyMethodDef xu_port_methods[] = { METH_VARARGS, "Returns TRUE if there is a request message to read.\n" }, - { "space_to_write_request", - (PyCFunction)xu_port_space_to_write_request, - METH_VARARGS, - "Returns TRUE if there is space to write a request message.\n" }, { "response_to_read", (PyCFunction)xu_port_response_to_read, METH_VARARGS, "Returns TRUE if there is a response message to read.\n" }, - { "space_to_write_response", - (PyCFunction)xu_port_space_to_write_response, - METH_VARARGS, - "Returns TRUE if there is space to write a response message.\n" }, - { "register", (PyCFunction)xu_port_register, METH_VARARGS | METH_KEYWORDS, @@ -1374,15 +1322,10 @@ static PyMethodDef xu_port_methods[] = { METH_VARARGS | METH_KEYWORDS, "Stop receiving a type of message on this port.\n" }, - { "connect", - (PyCFunction)xu_port_connect, - METH_VARARGS, - "Synchronously connect to remote domain.\n" }, - - { "disconnect", - (PyCFunction)xu_port_disconnect, + { "close", + (PyCFunction)xu_port_close, METH_VARARGS, - "Synchronously disconnect from remote domain.\n" }, + "Close the port.\n" }, { NULL, NULL, 0, NULL } }; @@ -1431,31 +1374,32 @@ static PyObject *xu_port_new(PyObject *self, PyObject *args, PyObject *kwds) static PyObject *xu_port_getattr(PyObject *obj, char *name) { xu_port_object *xup = (xu_port_object *)obj; + if ( strcmp(name, "local_port") == 0 ) - return PyInt_FromLong(xup->local_port); + { + return PyInt_FromLong(xup->connected ? xup->local_port : -1); + } if ( strcmp(name, "remote_port") == 0 ) - return PyInt_FromLong(xup->remote_port); + { + return PyInt_FromLong(xup->connected ? xup->remote_port : -1); + } if ( strcmp(name, "remote_dom") == 0 ) + { return PyInt_FromLong(xup->remote_dom); + } + if ( strcmp(name, "connected") == 0 ) + { + return PyInt_FromLong(xup->connected); + } return Py_FindMethod(xu_port_methods, obj, name); } static void xu_port_dealloc(PyObject *self) { - xu_port_object *xup = (xu_port_object *)self; - xcs_msg_t kmsg; - if ( xup->remote_dom != 0 ) - { - kmsg.type = XCS_CIF_FREE_CC; - kmsg.u.interface.dom = xup->remote_dom; - kmsg.u.interface.local_port = xup->local_port; - kmsg.u.interface.remote_port = xup->remote_port; - xcs_ctrl_send(&kmsg); - xcs_ctrl_read(&kmsg); - } - + _xu_port_close(xup); + PyObject_Del(self); } @@ -1638,6 +1582,26 @@ static PyObject *xu_buffer_full(PyObject *self, PyObject *args) return PyInt_FromLong(0); } +static PyObject *xu_buffer_size(PyObject *self, PyObject *args) +{ + xu_buffer_object *xub = (xu_buffer_object *)self; + + if ( !PyArg_ParseTuple(args, "") ) + return NULL; + + return PyInt_FromLong(xub->prod - xub->cons); +} + +static PyObject *xu_buffer_space(PyObject *self, PyObject *args) +{ + xu_buffer_object *xub = (xu_buffer_object *)self; + + if ( !PyArg_ParseTuple(args, "") ) + return NULL; + + return PyInt_FromLong(BUFSZ - (xub->prod - xub->cons)); +} + static PyMethodDef xu_buffer_methods[] = { { "peek", (PyCFunction)xu_buffer_peek, @@ -1669,6 +1633,16 @@ static PyMethodDef xu_buffer_methods[] = { METH_VARARGS, "Return TRUE if the buffer is full.\n" }, + { "size", + (PyCFunction)xu_buffer_size, + METH_VARARGS, + "Return number of bytes in the buffer.\n" }, + + { "space", + (PyCFunction)xu_buffer_space, + METH_VARARGS, + "Return space left in the buffer.\n" }, + { NULL, NULL, 0, NULL } }; diff --git a/tools/python/xen/xend/XendConsole.py b/tools/python/xen/xend/XendConsole.py index 6825dc5baa..889a59db32 100644 --- a/tools/python/xen/xend/XendConsole.py +++ b/tools/python/xen/xend/XendConsole.py @@ -1,40 +1,29 @@ # Copyright (C) 2004 Mike Wray -import socket import xen.lowlevel.xc xc = xen.lowlevel.xc.new() -import sxp -import XendRoot -xroot = XendRoot.instance() -import XendDB +import XendRoot; xroot = XendRoot.instance() from XendError import XendError -import EventServer -eserver = EventServer.instance() - -from xen.xend.server import SrvDaemon -daemon = SrvDaemon.instance() - class XendConsole: def __init__(self): pass - eserver.subscribe('xend.domain.died', self.onDomainDied) - eserver.subscribe('xend.domain.destroy', self.onDomainDied) - - def onDomainDied(self, event, val): - pass def console_ls(self): return [ c.console_port for c in self.consoles() ] def consoles(self): - return daemon.get_consoles() - - def console_create(self, dom, console_port=None): - consinfo = daemon.console_create(dom, console_port=console_port) - return consinfo + l = [] + xd = XendRoot.get_component('xen.xend.XendDomain') + for vm in xd.domains(): + ctrl = vm.getDeviceController("console", error=False) + if (not ctrl): continue + console = ctrl.getDevice(0) + if (not console): continue + l.append(console) + return l def console_get(self, id): id = int(id) diff --git a/tools/python/xen/xend/XendDomain.py b/tools/python/xen/xend/XendDomain.py index 7f5218bff1..23077a55e4 100644 --- a/tools/python/xen/xend/XendDomain.py +++ b/tools/python/xen/xend/XendDomain.py @@ -6,30 +6,68 @@ """ import sys import traceback - -from twisted.internet import defer -#defer.Deferred.debug = 1 -from twisted.internet import reactor +import time import xen.lowlevel.xc; xc = xen.lowlevel.xc.new() import sxp -import XendRoot -xroot = XendRoot.instance() +import XendRoot; xroot = XendRoot.instance() import XendDB import XendDomainInfo import XendMigrate -import EventServer +import EventServer; eserver = EventServer.instance() from XendError import XendError from XendLogging import log +from scheduler import Scheduler -from xen.xend.server import SrvDaemon -xend = SrvDaemon.instance() +from xen.xend.server import channel -eserver = EventServer.instance() __all__ = [ "XendDomain" ] + +SHUTDOWN_TIMEOUT = 30 + +class DomainShutdown: + """A pending domain shutdown. The domain is asked to shut down, + if it has not terminated or rebooted when the timeout expires it + is destroyed. + """ + + def __init__(self, dominfo, reason, key, timeout=None): + if timeout is None: + timeout = SHUTDOWN_TIMEOUT + self.start = time.time() + self.timeout = timeout + self.dominfo = dominfo + self.last_restart_time = dominfo.restart_time + self.last_restart_count = dominfo.restart_count + self.reason = reason + self.key = key + + def getDomain(self): + return self.dominfo.id + + def getDomainName(self): + return self.dominfo.name + + def getReason(self): + return self.reason + + def getTimeout(self): + return self.timeout + + def isTerminated(self): + return self.dominfo.is_terminated() + + def isRestarted(self): + return (self.dominfo.restart_count > self.last_restart_count) + + def isShutdown(self): + return self.isTerminated() or self.isRestarted() + + def isExpired(self): + return (time.time() - self.start) > self.timeout class XendDomain: """Index of all domains. Singleton. @@ -46,8 +84,11 @@ class XendDomain: restarts_by_id = {} restarts_by_name = {} + """Table of pending domain shutdowns, indexed by domain id.""" + shutdowns_by_id = {} + """Table of delayed calls.""" - schedule = {} + scheduler = Scheduler() def __init__(self): # Hack alert. Python does not support mutual imports, but XendDomainInfo @@ -67,6 +108,7 @@ class XendDomain: def onVirq(self, event, val): """Event handler for virq. """ + print 'onVirq>', val self.reap() def schedule_later(self, _delay, _name, _fn, *args): @@ -77,22 +119,16 @@ class XendDomain: @param _fn: function @param args: arguments """ - if self.schedule.get(_name): return - self.schedule[_name] = reactor.callLater(_delay, _fn, *args) + self.scheduler.later(_delay, _name, _fn, args) def schedule_cancel(self, name): """Cancel a scheduled function call. @param name: schedule name to cancel """ - callid = self.schedule.get(name) - if not callid: - return - if callid.active(): - callid.cancel() - del self.schedule[name] + self.scheduler.cancel(name) - def reap_schedule(self, delay=0): + def reap_schedule(self, delay=1): """Schedule reap to be called later. @param delay: delay in seconds @@ -104,7 +140,7 @@ class XendDomain: """ self.schedule_cancel('reap') - def refresh_schedule(self, delay=0): + def refresh_schedule(self, delay=1): """Schedule refresh to be called later. @param delay: delay in seconds @@ -116,7 +152,7 @@ class XendDomain: """ self.schedule_cancel('refresh') - def domain_restarts_schedule(self, delay=0): + def domain_restarts_schedule(self, delay=1): """Schedule domain_restarts to be called later. @param delay: delay in seconds @@ -132,30 +168,45 @@ class XendDomain: """Remove all domain info. Used after reboot. """ for (k, v) in self.domain_db.items(): - self._delete_domain(k, notify=0) - - def initial_refresh(self): - """Refresh initial domain info from domain_db. - """ - - def cb_all_ok(val): - self.refresh() + self._delete_domain(k, notify=False) + def xen_domains(self): + """Get table of domains indexed by id from xc. + """ domlist = xc.domain_getinfo() doms = {} for d in domlist: domid = str(d['dom']) doms[domid] = d - dlist = [] + return doms + + def xen_domain(self, dom): + """Get info about a single domain from xc. + Returns None if not found. + """ + dom = int(dom) + dominfo = xc.domain_getinfo(dom, 1) + if dominfo == [] or dominfo[0]['dom'] != dom: + dominfo = None + else: + dominfo = dominfo[0] + return dominfo + + def initial_refresh(self): + """Refresh initial domain info from domain_db. + """ + doms = self.xen_domains() for config in self.domain_db.values(): domid = str(sxp.child_value(config, 'id')) if domid in doms: - d_dom = self._new_domain(config, doms[domid]) - dlist.append(d_dom) + try: + self._new_domain(config, doms[domid]) + except Exception, ex: + log.exception("Error recreating domain info: id=%s", domid) + self._delete_domain(domid) else: self._delete_domain(domid) - d_all = defer.DeferredList(dlist, fireOnOneErrback=1) - d_all.addCallback(cb_all_ok) + self.refresh() def sync(self): """Sync domain db to disk. @@ -177,35 +228,45 @@ class XendDomain: @param savedinfo: saved info from the db @param info: domain info from xen - @return: deferred + @return: domain """ - def cbok(dominfo): - self.domain_by_id[dominfo.id] = dominfo - self.domain_by_name[dominfo.name] = dominfo - if dominfo.restart_pending(): - self.domain_restart_add(dominfo) - - deferred = XendDomainInfo.vm_recreate(savedinfo, info) - deferred.addCallback(cbok) - return deferred + dominfo = XendDomainInfo.vm_recreate(savedinfo, info) + self.domain_by_id[dominfo.id] = dominfo + self.domain_by_name[dominfo.name] = dominfo + if dominfo.restart_pending(): + self.domain_restart_add(dominfo) + return dominfo - def _add_domain(self, info, notify=1): + def _add_domain(self, info, notify=True): """Add a domain entry to the tables. @param info: domain info object @param notify: send a domain created event if true """ + # Remove entries under the wrong id. + for i, d in self.domain_by_id.items(): + if i != d.id: + del self.domain_by_id[i] + if i in self.domain_db: + del self.domain_db[i] + self.db.delete(i) + # Remove entries under the wrong name. + for n, d in self.domain_by_name.items(): + if n != d.name: + del self.domain_by_name[n] + # But also need to make sure are indexed under correct name. + # What about entries under info.name ? + if info.id in self.domain_by_id: + notify = False self.domain_by_id[info.id] = info self.domain_db[info.id] = info.sxpr() - for k, d in self.domain_by_name.items(): - if k != d.name: - del self.domain_by_name[k] if info.name: self.domain_by_name[info.name] = info self.sync_domain(info.id) - if notify: eserver.inject('xend.domain.create', [info.name, info.id]) + if notify: + eserver.inject('xend.domain.create', [info.name, info.id]) - def _delete_domain(self, id, notify=1): + def _delete_domain(self, id, notify=True): """Remove a domain from the tables. @param id: domain id @@ -214,10 +275,11 @@ class XendDomain: for (k, info) in self.domain_by_name.items(): if info.id == id: del self.domain_by_name[k] - if id in self.domain_by_id: - info = self.domain_by_id[id] + info = self.domain_by_id.get(id) + if info: del self.domain_by_id[id] - if notify: eserver.inject('xend.domain.died', [info.name, info.id]) + if notify: + eserver.inject('xend.domain.died', [info.name, info.id]) if id in self.domain_db: del self.domain_db[id] self.db.delete(id) @@ -227,9 +289,9 @@ class XendDomain: Tidy them up. """ self.reap_cancel() - domlist = xc.domain_getinfo() casualties = [] - for d in domlist: + doms = self.xen_domains() + for d in doms.values(): dead = 0 dead = dead or (d['crashed'] or d['shutdown']) dead = dead or (d['dying'] and @@ -239,8 +301,12 @@ class XendDomain: destroyed = 0 for d in casualties: id = str(d['dom']) + print 'reap>', id dominfo = self.domain_by_id.get(id) name = (dominfo and dominfo.name) or '??' + if dominfo and dominfo.is_terminated(): + print 'reap> already terminated:', id + continue log.debug('XendDomain>reap> domain died name=%s id=%s', name, id) if d['shutdown']: reason = XendDomainInfo.shutdown_reason(d['shutdown_reason']) @@ -255,36 +321,34 @@ class XendDomain: eserver.inject('xend.domain.exit', [name, id, reason]) self.domain_restart_schedule(id, reason) else: - eserver.inject('xend.domain.exit', [name, id, 'crash']) + eserver.inject('xend.domain.exit', [name, id, 'crash']) destroyed += 1 self.final_domain_destroy(id) if self.domain_restarts_exist(): self.domain_restarts_schedule() if destroyed: - self.refresh_schedule(delay=1) + self.refresh_schedule(delay=5) def refresh(self): """Refresh domain list from Xen. """ self.refresh_cancel() - domlist = xc.domain_getinfo() - # Index the domlist by id. + doms = self.xen_domains() # Add entries for any domains we don't know about. - doms = {} - for d in domlist: - id = str(d['dom']) - doms[id] = d + for (id, d) in doms.items(): if id not in self.domain_by_id: + log.warning("Created entry for unknown domain: %s", id) savedinfo = None - deferred = XendDomainInfo.vm_recreate(savedinfo, d) - def cbok(dominfo): - self._add_domain(dominfo) - deferred.addCallback(cbok) + dominfo = XendDomainInfo.vm_recreate(savedinfo, d) + self._add_domain(dominfo) # Remove entries for domains that no longer exist. + # Update entries for existing domains. for d in self.domain_by_id.values(): info = doms.get(d.id) if info: d.update(info) + elif d.restart_pending(): + pass else: self._delete_domain(d.id) self.reap_schedule(delay=1) @@ -304,19 +368,13 @@ class XendDomain: @param id: domain id """ - dom = int(id) - dominfo = xc.domain_getinfo(dom, 1) - if dominfo == [] or dominfo[0]['dom'] != dom: - try: - self._delete_domain(id) - except: - log.exception('refresh_domain> error') - raise - pass - else: + dominfo = xen_domain(id) + if dominfo: d = self.domain_by_id.get(id) if d: - d.update(dominfo[0]) + d.update(dominfo) + else: + self._delete_domain(id) def domain_ls(self): """Get list of domain names. @@ -346,30 +404,33 @@ class XendDomain: """Create a domain from a configuration. @param config: configuration - @return: deferred + @return: domain """ - def cbok(dominfo): - self._add_domain(dominfo) - return dominfo - deferred = XendDomainInfo.vm_create(config) - deferred.addCallback(cbok) - return deferred + dominfo = XendDomainInfo.vm_create(config) + self._add_domain(dominfo) + return dominfo def domain_restart(self, dominfo): """Restart a domain. @param dominfo: domain object - @return: deferred """ - def cbok(dominfo): - self._add_domain(dominfo) - return dominfo log.info("Restarting domain: id=%s name=%s", dominfo.id, dominfo.name) eserver.inject("xend.domain.restart", [dominfo.name, dominfo.id, "begin"]) - deferred = dominfo.restart() - deferred.addCallback(cbok) - return deferred + try: + dominfo.restart() + self._add_domain(dominfo) + log.info('Restarted domain name=%s id=%s', dominfo.name, dominfo.id) + eserver.inject("xend.domain.restart", + [dominfo.name, dominfo.id, "success"]) + self.domain_unpause(dominfo.id) + except Exception, ex: + log.exception("Exception restarting domain: name=%s id=%s", + dominfo.name, dominfo.id) + eserver.inject("xend.domain.restart", + [dominfo.name, dominfo.id, "fail"]) + return dominfo def domain_configure(self, id, vmconfig): """Configure an existing domain. This is intended for internal @@ -377,38 +438,25 @@ class XendDomain: @param id: domain id @param vmconfig: vm configuration - @return: deferred """ config = sxp.child_value(vmconfig, 'config') dominfo = self.domain_lookup(id) log.debug('domain_configure> id=%s config=%s', str(id), str(config)) if dominfo.config: raise XendError("Domain already configured: " + dominfo.id) - def cbok(dominfo): - self._add_domain(dominfo) - return dominfo - deferred = dominfo.dom_construct(dominfo.dom, config) - deferred.addCallback(cbok) - return deferred + dominfo.dom_construct(dominfo.dom, config) + self._add_domain(dominfo) + return dominfo - def domain_restore(self, src, progress=0): + def domain_restore(self, src, progress=False): """Restore a domain from file. @param src: source file @param progress: output progress if true @return: deferred """ - - if 0: - def cbok(dominfo): - self._add_domain(dominfo) - return dominfo - deferred = XendDomainInfo.vm_restore(src, progress=progress) - deferred.addCallback(cbok) - else: - xmigrate = XendMigrate.instance() - deferred = xmigrate.restore_begin(src) - return deferred + xmigrate = XendMigrate.instance() + return xmigrate.restore_begin(src) def domain_get(self, id): """Get up-to-date info about a domain. @@ -425,7 +473,7 @@ class XendDomain: dominfo = self.domain_by_name.get(name) or self.domain_by_id.get(name) if dominfo: return dominfo - raise XendError('invalid domain:' + name) + raise XendError('invalid domain: ' + name) def domain_exists(self, name): name = str(name) @@ -470,15 +518,54 @@ class XendDomain: if reason == 'halt': self.domain_restart_cancel(dominfo.id) else: - self.domain_restart_schedule(dominfo.id, reason, force=1) + self.domain_restart_schedule(dominfo.id, reason, force=True) eserver.inject('xend.domain.shutdown', [dominfo.name, dominfo.id, reason]) if reason == 'halt': reason = 'poweroff' - val = xend.domain_shutdown(dominfo.id, reason, key) - self.refresh_schedule() + val = dominfo.shutdown(reason, key=key) + self.add_shutdown(dominfo, reason, key) + self.refresh_schedule(delay=10) return val - def domain_restart_schedule(self, id, reason, force=0): + def add_shutdown(self, dominfo, reason, key): + """Add a pending shutdown for a domain. + This will destroy the domain if the shutdown times out. + """ + if dominfo.id in self.shutdowns_by_id: + return + self.shutdowns_by_id[dominfo.id] = DomainShutdown(dominfo, reason, key) + self.domain_shutdowns() + + def domain_shutdowns(self): + """Process pending domain shutdowns. + Destroys domains whose shutdowns have timed out. + """ + self.schedule_cancel('domain_shutdowns') + timeout = SHUTDOWN_TIMEOUT + for shutdown in self.shutdowns_by_id.values(): + id = shutdown.getDomain() + if shutdown.isShutdown(): + # Shutdown done - remove. + print 'domain_shutdowns> done: ', id + del self.shutdowns_by_id[id] + elif shutdown.isExpired(): + # Shutdown expired - remove and destroy domain. + del self.shutdowns_by_id[id] + try: + log.info("Domain shutdown timeout expired: name=%s id=%s", + shutdown.getDomainName(), id) + self.domain_destroy(id, reason=shutdown.getReason()) + except Exception: + pass + else: + # Shutdown still pending. + print 'domain_shutdowns> pending: ', id + timeout = min(timeout, shutdown.getTimeout()) + if self.shutdowns_by_id: + # Pending shutdowns remain - reschedule. + self.schedule_later(timeout, 'domain_shutdowns', self.domain_shutdowns) + + def domain_restart_schedule(self, id, reason, force=False): """Schedule a restart for a domain if it needs one. @param id: domain id @@ -510,7 +597,8 @@ class XendDomain: """ dominfo = self.restarts_by_id.get(id) or self.restarts_by_name.get(id) if dominfo: - log.info('Cancelling restart for domain: name=%s id=%s', dominfo.name, dominfo.id) + log.info('Cancelling restart for domain: name=%s id=%s', + dominfo.name, dominfo.id) eserver.inject("xend.domain.restart", [dominfo.name, dominfo.id, "cancel"]) dominfo.restart_cancel() @@ -521,36 +609,22 @@ class XendDomain: """Execute any scheduled domain restarts for domains that have gone. """ self.domain_restarts_cancel() + doms = self.xen_domains() for dominfo in self.restarts_by_id.values(): - if dominfo.id in self.domain_by_id: + print 'domain_restarts>', dominfo.name, dominfo.id + info = doms.get(dominfo.id) + if info: # Don't execute restart for domains still running. + print 'domain_restarts> still runnning: ', dominfo.name continue # Remove it from the restarts. del self.restarts_by_id[dominfo.id] del self.restarts_by_name[dominfo.name] - try: - def cbok(dominfo): - log.info('Restarted domain name=%s id=%s', dominfo.name, dominfo.id) - eserver.inject("xend.domain.restart", - [dominfo.name, dominfo.id, "success"]) - self.domain_unpause(dominfo.id) - def cberr(err): - log.exception("Delayed exception restarting domain: name=%s id=%s", - dominfo.name, dominfo.id) - eserver.inject("xend.domain.restart", - [dominfo.name, dominfo.id, "fail"]) - - deferred = self.domain_restart(dominfo) - deferred.addCallback(cbok) - deferred.addErrback(cberr) - except: - log.exception("Exception restarting domain: name=%s id=%s", - dominfo.name, dominfo.id) - eserver.inject("xend.domain.restart", - [dominfo.name, dominfo.id, "fail"]) + print 'domain_restarts> restarting: ', dominfo.name + self.domain_restart(dominfo) if self.domain_restarts_exist(): # Run again later if any restarts remain. - self.refresh_schedule(delay=5) + self.refresh_schedule(delay=10) def domain_restarts_exist(self): return len(self.restarts_by_id) @@ -560,14 +634,17 @@ class XendDomain: @param id: domain id """ - dominfo = self.domain_lookup(id) - log.info('Destroying domain: name=%s', dominfo.name) - eserver.inject('xend.domain.destroy', [dominfo.name, dominfo.id]) - if dominfo: + try: + dominfo = self.domain_lookup(id) + log.info('Destroying domain: name=%s', dominfo.name) + eserver.inject('xend.domain.destroy', [dominfo.name, dominfo.id]) val = dominfo.destroy() - else: + except: #todo - val = xc.domain_destroy(dom=dominfo.dom) + try: + val = xc.domain_destroy(dom=int(id)) + except Exception, ex: + raise XendError(str(ex)) return val def domain_destroy(self, id, reason='halt'): @@ -580,12 +657,12 @@ class XendDomain: if reason == 'halt': self.domain_restart_cancel(id) elif reason == 'reboot': - self.domain_restart_schedule(id, reason, force=1) + self.domain_restart_schedule(id, reason, force=True) val = self.final_domain_destroy(id) self.refresh_schedule() return val - def domain_migrate(self, id, dst, live=0, resource=0): + def domain_migrate(self, id, dst, live=False, resource=0): """Start domain migration. @param id: domain id @@ -595,10 +672,9 @@ class XendDomain: # Don't forget to cancel restart for it. dominfo = self.domain_lookup(id) xmigrate = XendMigrate.instance() - val = xmigrate.migrate_begin(dominfo, dst, live=live, resource=resource) - return val + return xmigrate.migrate_begin(dominfo, dst, live=live, resource=resource) - def domain_save(self, id, dst, progress=0): + def domain_save(self, id, dst, progress=False): """Start saving a domain to file. @param id: domain id @@ -647,7 +723,6 @@ class XendDomain: @param id: domain id @param devconfig: device configuration - @return: deferred """ dominfo = self.domain_lookup(id) self.refresh_schedule() @@ -703,8 +778,7 @@ class XendDomain: @return: device indexes """ dominfo = self.domain_lookup(id) - devs = dominfo.get_devices(type) - return devs + return dominfo.get_devices(type) def domain_devtype_get(self, id, type, idx): """Get a device from a domain. @@ -787,7 +861,8 @@ class XendDomain: raise XendError(str(ex)) def domain_mem_target_set(self, id, target): - return xend.domain_mem_target_set(id, target) + dominfo = self.domain_lookup(id) + return dominfo.mem_target_set(target) diff --git a/tools/python/xen/xend/XendDomainInfo.py b/tools/python/xen/xend/XendDomainInfo.py index e97266b303..d576606890 100644 --- a/tools/python/xen/xend/XendDomainInfo.py +++ b/tools/python/xen/xend/XendDomainInfo.py @@ -9,30 +9,25 @@ Author: Mike Wray """ import string -import types -import re -import sys import os import time -from twisted.internet import defer - import xen.lowlevel.xc; xc = xen.lowlevel.xc.new() import xen.util.ip from xen.util.ip import _readline, _readlines -from xen.xend.server import channel +from xen.xend.server import channel, controller -import sxp +from server.channel import channelFactory +import server.SrvDaemon; xend = server.SrvDaemon.instance() +from server import messages -import XendConsole -xendConsole = XendConsole.instance() +import sxp from XendLogging import log +from XendError import VmError from XendRoot import get_component +#import XendConsole; xendConsole = XendConsole.instance() -import server.SrvDaemon -xend = server.SrvDaemon.instance() - -from XendError import VmError +from PrettyPrint import prettyprint """The length of domain names that Xen can handle. The names stored in Xen itself are not used for much, and @@ -48,8 +43,10 @@ SIF_NET_BE_DOMAIN = (1<<5) """Shutdown code for poweroff.""" DOMAIN_POWEROFF = 0 + """Shutdown code for reboot.""" DOMAIN_REBOOT = 1 + """Shutdown code for suspend.""" DOMAIN_SUSPEND = 2 @@ -59,6 +56,15 @@ shutdown_reasons = { DOMAIN_REBOOT : "reboot", DOMAIN_SUSPEND : "suspend" } +"""Map shutdown reasons to the message type to use. +""" +shutdown_messages = { + 'poweroff' : 'shutdown_poweroff_t', + 'reboot' : 'shutdown_reboot_t', + 'suspend' : 'shutdown_suspend_t', + 'sysrq' : 'shutdown_sysrq_t', + } + RESTART_ALWAYS = 'always' RESTART_ONREBOOT = 'onreboot' RESTART_NEVER = 'never' @@ -164,21 +170,12 @@ Indexed by device type. """ device_handlers = {} -def add_device_handler(name, h): - """Add a handler for a device type. - - @param name: device type - @param h: handler: fn(vm, dev) - """ - device_handlers[name] = h +def add_device_handler(name, type): + device_handlers[name] = type def get_device_handler(name): - """Get the handler for a device type. - - @param name : device type - @return; handler or None - """ - return device_handlers.get(name) + return device_handlers[name] + def vm_create(config): """Create a VM from a configuration. @@ -186,11 +183,11 @@ def vm_create(config): is destroyed. @param config configuration - @return: Deferred @raise: VmError for invalid configuration """ vm = XendDomainInfo() - return vm.construct(config) + vm.construct(config) + return vm def vm_recreate(savedinfo, info): """Create the VM object for an existing domain. @@ -199,37 +196,37 @@ def vm_recreate(savedinfo, info): @type savedinfo: sxpr @param info: domain info from xc @type info: xc domain dict - @return: deferred """ + print 'vm_recreate>' + print 'savedinfo=' ; prettyprint(savedinfo) + print 'info=', info vm = XendDomainInfo() - vm.recreate = 1 + vm.recreate = True vm.savedinfo = savedinfo vm.setdom(info['dom']) - #vm.name = info['name'] vm.memory = info['mem_kb']/1024 start_time = sxp.child_value(savedinfo, 'start_time') if start_time is not None: vm.start_time = float(start_time) vm.restart_state = sxp.child_value(savedinfo, 'restart_state') + vm.restart_count = int(sxp.child_value(savedinfo, 'restart_count', 0)) restart_time = sxp.child_value(savedinfo, 'restart_time') if restart_time is not None: vm.restart_time = float(restart_time) config = sxp.child_value(savedinfo, 'config') if config: - d = vm.construct(config) + vm.construct(config) else: vm.name = sxp.child_value(savedinfo, 'name', "Domain-%d" % info['dom']) - d = defer.succeed(vm) - vm.recreate = 0 + vm.recreate = False vm.savedinfo = None - return d + return vm -def vm_restore(src, progress=0): +def vm_restore(src, progress=False): """Restore a VM from a disk image. src saved state to restore progress progress reporting flag - returns deferred raises VmError for invalid configuration """ vm = XendDomainInfo() @@ -244,12 +241,9 @@ def vm_restore(src, progress=0): config = sxp.child_value(vmconfig, 'config') except Exception, ex: raise VmError('config error: ' + str(ex)) - deferred = vm.dom_construct(dom, config) - def vifs_cb(val, vm): - vif_up(vm.ipaddrs) - return vm - deferred.addCallback(vifs_cb, vm) - return deferred + vm.dom_construct(dom, config) + vif_up(vm.ipaddrs) + return vm def dom_get(dom): """Get info from xen for an existing domain. @@ -262,27 +256,6 @@ def dom_get(dom): return domlist[0] return None -def append_deferred(dlist, v): - """Append a value to a deferred list if it is a deferred. - - @param dlist: list of deferreds - @param v: value to add - """ - if isinstance(v, defer.Deferred): - dlist.append(v) - -def dlist_err(val): - """Error callback suitable for a deferred list. - In a deferred list the error callback is called with with Failure((error, index)). - This callback extracts the error and returns it. - - @param val: Failure containing (error, index) - @type val: twisted.internet.failure.Failure - """ - - (error, index) = val.value - return error - class XendDomainInfo: """Virtual machine object.""" @@ -303,25 +276,31 @@ class XendDomainInfo: self.image = None self.ramdisk = None self.cmdline = None - self.console = None + + self.channel = None + self.controllers = {} self.devices = {} - self.device_index = {} + self.configs = [] + self.info = None self.ipaddrs = [] - self.blkif_backend = 0 - self.netif_backend = 0 + self.blkif_backend = False + self.netif_backend = False #todo: state: running, suspended self.state = STATE_VM_OK #todo: set to migrate info if migrating self.migrate = None + self.restart_mode = RESTART_ONREBOOT self.restart_state = None self.restart_time = None + self.restart_count = 0 + self.console_port = None self.savedinfo = None self.image_handler = None - self.is_vmx = 0 + self.is_vmx = False self.vcpus = 1 def setdom(self, dom): @@ -332,6 +311,15 @@ class XendDomainInfo: self.dom = int(dom) self.id = str(dom) + def getDomain(self): + return self.dom + + def getName(self): + return self.name + + def getChannel(self): + return self.channel + def update(self, info): """Update with info from xc.domain_getinfo(). """ @@ -343,8 +331,9 @@ class XendDomainInfo: s += " id=" + self.id s += " name=" + self.name s += " memory=" + str(self.memory) - if self.console: - s += " console=" + str(self.console.console_port) + console = self.getConsole() + if console: + s += " console=" + str(console.console_port) if self.image: s += " image=" + self.image s += "" @@ -352,6 +341,71 @@ class XendDomainInfo: __repr__ = __str__ + def getDeviceTypes(self): + return self.controllers.keys() + + def getDeviceControllers(self): + return self.controllers.values() + + def getDeviceController(self, type, error=True): + ctrl = self.controllers.get(type) + if not ctrl and error: + raise XendError("invalid device type:" + type) + return ctrl + + def findDeviceController(self, type): + return (self.getDeviceController(type, error=False) + or self.createDeviceController(type)) + + def createDeviceController(self, type): + ctrl = controller.createDevController(type, self, recreate=self.recreate) + self.controllers[type] = ctrl + return ctrl + + def createDevice(self, type, devconfig, recreate=False): + ctrl = self.findDeviceController(type) + return ctrl.createDevice(devconfig, recreate=self.recreate) + + def configureDevice(self, type, id, devconfig): + ctrl = self.getDeviceController(type) + return ctrl.configureDevice(id, devconfig) + + def destroyDevice(self, type, id, change=False, reboot=False): + ctrl = self.getDeviceController(type) + return ctrl.destroyDevice(id, change=change, reboot=reboot) + + def deleteDevice(self, type, id): + ctrl = self.getDeviceController(type) + return ctrl.deleteDevice(id) + + def getDevice(self, type, id): + ctrl = self.getDeviceController(type) + return ctrl.getDevice(id) + + def getDeviceByIndex(self, type, idx): + ctrl = self.getDeviceController(type) + return ctrl.getDeviceByIndex(idx) + + def getDeviceIndex(self, type, dev): + ctrl = self.getDeviceController(type) + return ctrl.getDeviceIndex(dev) + + def getDeviceConfig(self, type, id): + ctrl = self.getDeviceController(type) + return ctrl.getDeviceConfig(id) + + def getDeviceIds(self, type): + ctrl = self.getDeviceController(type) + return ctrl.getDeviceIds() + + def getDeviceConfigs(self, type): + ctrl = self.getDeviceController(type) + return ctrl.getDeviceConfigs() + + def getDeviceSxprs(self, type): + ctrl = self.getDeviceController(type) + return ctrl.getDeviceSxprs() + def sxpr(self): sxpr = ['domain', ['id', self.id], @@ -378,8 +432,13 @@ class XendDomainInfo: sxpr.append(['up_time', str(up_time) ]) sxpr.append(['start_time', str(self.start_time) ]) - if self.console: - sxpr.append(self.console.sxpr()) + if self.channel: + sxpr.append(self.channel.sxpr()) + console = self.getConsole() + if console: + sxpr.append(console.sxpr()) + if self.restart_count: + sxpr.append(['restart_count', self.restart_count]) if self.restart_state: sxpr.append(['restart_state', self.restart_state]) if self.restart_time: @@ -393,10 +452,10 @@ class XendDomainInfo: def sxpr_devices(self): sxpr = ['devices'] - for devs in self.devices.values(): - for dev in devs: - if hasattr(dev, 'sxpr'): - sxpr.append(dev.sxpr()) + for ty in self.getDeviceTypes(): + devs = [ ty ] + devs += self.getDeviceSxprs(ty) + sxpr.append(devs) return sxpr def check_name(self, name): @@ -432,46 +491,55 @@ class XendDomainInfo: """Construct the vm instance from its configuration. @param config: configuration - @return: deferred @raise: VmError on error """ # todo - add support for scheduling params? self.config = config try: + # Initial domain create. self.name = sxp.child_value(config, 'name') self.check_name(self.name) - try: - self.cpu_weight = float(sxp.child_value(config, 'cpu_weight', '1')) - except: - raise VmError('invalid cpu weight') - self.memory = int(sxp.child_value(config, 'memory')) - if self.memory is None: - raise VmError('missing memory size') - cpu = sxp.child_value(config, 'cpu') - if self.recreate and self.dom and cpu is not None: - xc.domain_pincpu(self.dom, int(cpu)) - try: - image = sxp.child_value(self.config, 'image') - self.vcpus = int(sxp.child_value(image, 'vcpus')) - except: - raise VmError('invalid vcpus value') - + self.configure_cpus(config) self.find_image_handler() self.init_domain() - self.configure_console() + self.register_domain() + + # Create domain devices. self.configure_backends() - self.construct_image() + self.configure_console() self.configure_restart() - deferred = self.configure() - def cberr(err): - self.destroy() - return err - deferred.addErrback(cberr) - except StandardError, ex: + self.construct_image() + self.configure() + except Exception, ex: # Catch errors, cleanup and re-raise. + print 'Domain construction error:', ex + import traceback + traceback.print_exc() self.destroy() raise - return deferred + + def register_domain(self): + xd = get_component('xen.xend.XendDomain') + xd._add_domain(self) + + def configure_cpus(self, config): + try: + self.cpu_weight = float(sxp.child_value(config, 'cpu_weight', '1')) + except: + raise VmError('invalid cpu weight') + self.memory = int(sxp.child_value(config, 'memory')) + if self.memory is None: + raise VmError('missing memory size') + cpu = sxp.child_value(config, 'cpu') + if self.recreate and self.dom and cpu is not None: + xc.domain_pincpu(self.dom, int(cpu)) + try: + image = sxp.child_value(self.config, 'image') + vcpus = sxp.child_value(image, 'vcpus') + if vcpus: + self.vcpus = int(vcpus) + except: + raise VmError('invalid vcpus value') def find_image_handler(self): """Construct the boot image for the domain. @@ -485,7 +553,7 @@ class XendDomainInfo: if image_name is None: raise VmError('missing image name') if image_name == "vmx": - self.is_vmx = 1 + self.is_vmx = True image_handler = get_image_handler(image_name) if image_handler is None: raise VmError('unknown image type: ' + image_name) @@ -513,107 +581,18 @@ class XendDomainInfo: devices.append(dev) return devices - def config_device(self, type, idx): - """Get a device config from the device nodes of a given type - from the config. - - @param type: device type - @type type: string - @param idx: index - @type idx: int - @return config or None - """ - devs = self.config_devices(type) - if 0 <= idx < len(devs): - return devs[idx] - else: - return None - - def next_device_index(self, type): - """Get the next index for a given device type. - - @param type: device type - @type type: string - @return device index - @rtype: int - """ - idx = self.device_index.get(type, 0) - self.device_index[type] = idx + 1 - return idx - - def add_device(self, type, dev): - """Add a device to a virtual machine. - - @param type: device type - @param dev: device to add - """ - dl = self.devices.get(type, []) - dl.append(dev) - self.devices[type] = dl - - def refresh_device(self, type, dev): - """Refresh a device to a virtual machine. - - @param type: device type - @param dev: device - """ - dl = self.devices.get(type, []) - if dev in dl: - dl.refresh(dev) - - def remove_device(self, type, dev): - """Remove a device from a virtual machine. - - @param type: device type - @param dev: device - """ - dl = self.devices.get(type, []) - if dev in dl: - dl.remove(dev) - - def get_devices(self, type): - """Get a list of the devices of a given type. - - @param type: device type - @return: devices - """ - val = self.devices.get(type, []) - return val - - def get_device_by_id(self, type, id): - """Get the device with the given id. - - @param id: device id - @return: device or None - """ - dl = self.get_devices(type) - for d in dl: - if d.getprop('id') == id: - return d - return None - - def get_device_by_index(self, type, idx): - """Get the device with the given index. - - @param idx: device index - @return: device or None - """ - idx = str(idx) - dl = self.get_devices(type) - for d in dl: - if d.getidx() == idx: - return d - return None - def get_device_savedinfo(self, type, index): val = None if self.savedinfo is None: return val - index = str(index) devinfo = sxp.child(self.savedinfo, 'devices') if devinfo is None: return val - for d in sxp.children(devinfo, type): + devs = sxp.child(devinfo, type) + if devs is None: + return val + index = str(index) + for d in sxp.children(devs): dindex = sxp.child_value(d, 'index') if dindex is None: continue if str(dindex) == index: @@ -631,10 +610,11 @@ class XendDomainInfo: @param period: vif period in uSec @return: 0 on success """ - - ctrl = xend.netif_create(self.dom, recreate=self.recreate) - d = ctrl.limitDevice(vif, credit, period) - return d + #todo: all wrong + #ctrl = xend.netif_create(self.dom, recreate=self.recreate) + #d = ctrl.limitDevice(vif, credit, period) + #return d + pass def add_config(self, val): """Add configuration data to a virtual machine. @@ -654,25 +634,15 @@ class XendDomainInfo: The domain will not finally go away unless all vm devices have been released. """ + if self.channel: + self.channel.close() + self.channel = None if self.dom is None: return 0 - self.destroy_console() - chan = xend.getDomChannel(self.dom) - if chan: - log.debug("Closing channel to domain %d", self.dom) - chan.close() try: return xc.domain_destroy(dom=self.dom) except Exception, err: log.exception("Domain destroy failed: %s", self.name) - def destroy_console(self): - if self.console: - if self.restart_pending(): - self.console.deregisterChannel() - else: - log.debug('Closing console, domain %s', self.id) - self.console.close() - def cleanup(self): """Cleanup vm resources: release devices. """ @@ -687,41 +657,15 @@ class XendDomainInfo: def release_devices(self): """Release all vm devices. """ - self.release_vifs() - self.release_vbds() - self.release_usbifs() - - self.devices = {} - self.device_index = {} - self.configs = [] - self.ipaddrs = [] - - def release_vifs(self): - """Release vm virtual network devices (vifs). - """ - if self.dom is None: return - ctrl = xend.netif_get(self.dom) - if ctrl: - log.debug("Destroying vifs for domain %d", self.dom) - ctrl.destroy() - - def release_vbds(self): - """Release vm virtual block devices (vbds). - """ - if self.dom is None: return - ctrl = xend.blkif_get(self.dom) - if ctrl: - log.debug("Destroying vbds for domain %d", self.dom) - ctrl.destroy() - - def release_usbifs(self): - """Release vm virtual USB devices (usbifs). - """ - if self.dom is None: return - ctrl = xend.usbif_get(self.dom) - if ctrl: - log.debug("Destroying usbifs for domain %d", self.dom) - ctrl.destroy() + reboot = self.restart_pending() + for ctrl in self.getDeviceControllers(): + if ctrl.isDestroyed(): continue + ctrl.destroyController(reboot=reboot) + if not reboot: + self.devices = {} + self.device_index = {} + self.configs = [] + self.ipaddrs = [] def show(self): """Print virtual machine info. @@ -752,10 +696,6 @@ class XendDomainInfo: return dom = self.dom or 0 memory = self.memory - name = self.name - # If the name is over the xen limit, use the end of it. - if len(name) > MAX_DOMAIN_NAME: - name = name[-MAX_DOMAIN_NAME:] try: cpu = int(sxp.child_value(self.config, 'cpu', '-1')) except: @@ -766,8 +706,8 @@ class XendDomainInfo: cpu= cpu, cpu_weight= cpu_weight) if dom <= 0: raise VmError('Creating domain failed: name=%s memory=%d' - % (name, memory)) - log.debug('init_domain> Created domain=%d name=%s memory=%d', dom, name, memory) + % (self.name, memory)) + log.debug('init_domain> Created domain=%d name=%s memory=%d', dom, self.name, memory) self.setdom(dom) def build_domain(self, ostype, kernel, ramdisk, cmdline, memmap): @@ -785,28 +725,29 @@ class XendDomainInfo: flags = 0 if self.netif_backend: flags |= SIF_NET_BE_DOMAIN if self.blkif_backend: flags |= SIF_BLK_BE_DOMAIN - if ostype == "vmx": - err = buildfn(dom = dom, - image = kernel, - control_evtchn = 0, - memsize = self.memory, - memmap = memmap, - cmdline = cmdline, - ramdisk = ramdisk, - flags = flags) - else: - log.warning('building dom with %d vcpus', self.vcpus) - err = buildfn(dom = dom, - image = kernel, - control_evtchn = self.console.getRemotePort(), - cmdline = cmdline, - ramdisk = ramdisk, - flags = flags, - vcpus = self.vcpus) - if err != 0: - raise VmError('Building domain failed: type=%s dom=%d err=%d' - % (ostype, dom, err)) - + #todo generalise this + if ostype == "vmx": + err = buildfn(dom = dom, + image = kernel, + control_evtchn = 0, + memsize = self.memory, + memmap = memmap, + cmdline = cmdline, + ramdisk = ramdisk, + flags = flags) + else: + log.warning('building dom with %d vcpus', self.vcpus) + err = buildfn(dom = dom, + image = kernel, + control_evtchn = self.channel.getRemotePort(), + cmdline = cmdline, + ramdisk = ramdisk, + flags = flags, + vcpus = self.vcpus) + if err != 0: + raise VmError('Building domain failed: type=%s dom=%d err=%d' + % (ostype, dom, err)) + def create_domain(self, ostype, kernel, ramdisk, cmdline, memmap=''): """Create a domain. Builds the image but does not configure it. @@ -817,87 +758,87 @@ class XendDomainInfo: """ self.create_channel() - if self.console: - self.console.registerChannel() - else: - self.console = xendConsole.console_create( - self.dom, console_port=self.console_port) self.build_domain(ostype, kernel, ramdisk, cmdline, memmap) self.image = kernel self.ramdisk = ramdisk self.cmdline = cmdline def create_channel(self): - """Create the channel to the domain. + """Create the control channel to the domain. If saved info is available recreate the channel using the saved ports. - - @return: channel """ local = 0 remote = 1 if self.savedinfo: - consinfo = sxp.child(self.savedinfo, "console") - if consinfo: - local = int(sxp.child_value(consinfo, "local_port", 0)) - remote = int(sxp.child_value(consinfo, "remote_port", 1)) - return xend.createDomChannel(self.dom, local_port=local, - remote_port=remote) - + info = sxp.child(self.savedinfo, "channel") + if info: + local = int(sxp.child_value(info, "local_port", 0)) + remote = int(sxp.child_value(info, "remote_port", 1)) + self.channel = channelFactory().openChannel(self.dom, + local_port=local, + remote_port=remote) + + def create_configured_devices(self): + devices = sxp.children(self.config, 'device') + indexes = {} + for d in devices: + dev_config = sxp.child0(d) + if dev_config is None: + raise VmError('invalid device') + dev_type = sxp.name(dev_config) + ctrl_type = get_device_handler(dev_type) + if ctrl_type is None: + raise VmError('unknown device type: ' + dev_type) + # Keep track of device indexes by type, so we can fish + # out saved info for recreation. + idx = indexes.get(dev_type, -1) + idx += 1 + indexes[ctrl_type] = idx + recreate = self.get_device_recreate(dev_type, idx) + self.createDevice(ctrl_type, dev_config, recreate=recreate) + def create_devices(self): """Create the devices for a vm. - @return: Deferred @raise: VmError for invalid devices """ - dlist = [] - devices = sxp.children(self.config, 'device') - index = {} - for d in devices: - dev = sxp.child0(d) - if dev is None: - raise VmError('invalid device') - dev_name = sxp.name(dev) - dev_index = index.get(dev_name, 0) - dev_handler = get_device_handler(dev_name) - if dev_handler is None: - raise VmError('unknown device type: ' + dev_name) - v = dev_handler(self, dev, dev_index) - append_deferred(dlist, v) - index[dev_name] = dev_index + 1 - deferred = defer.DeferredList(dlist, fireOnOneErrback=1) - deferred.addErrback(dlist_err) + if self.rebooting(): + for ctrl in self.getDeviceControllers(): + ctrl.initController(reboot=True) + else: + self.create_configured_devices() if self.is_vmx: - device_model = sxp.child_value(self.config, 'device_model') - device_config = sxp.child_value(self.config, 'device_config') - memory = sxp.child_value(self.config, "memory") - # Create an event channel - device_channel = channel.eventChannel(0, self.dom) - # Fork and exec device_model -f device_config - os.system(device_model - + " -f %s" % device_config - + " -d %d" % self.dom - + " -p %d" % device_channel['port1'] - + " -m %s" % memory) - return deferred + self.create_vmx_model() + + def create_vmx_model(self): + #todo: remove special case for vmx + device_model = sxp.child_value(self.config, 'device_model') + if not device_model: + raise VmError("vmx: missing device model") + device_config = sxp.child_value(self.config, 'device_config') + if not device_config: + raise VmError("vmx: missing device config") + #todo: self.memory? + memory = sxp.child_value(self.config, "memory") + # Create an event channel + device_channel = channel.eventChannel(0, self.dom) + # Execute device model. + #todo: Error handling + os.system(device_model + + " -f %s" % device_config + + " -d %d" % self.dom + + " -p %d" % device_channel['port1'] + + " -m %s" % memory) def device_create(self, dev_config): """Create a new device. @param dev_config: device configuration - @return: deferred """ - dev_name = sxp.name(dev_config) - dev_handler = get_device_handler(dev_name) - if dev_handler is None: - raise VmError('unknown device type: ' + dev_name) - devs = self.get_devices(dev_name) - dev_index = len(devs) - self.config.append(['device', dev_config]) - d = dev_handler(self, dev_config, dev_index, change=1) - def cbok(dev): - return dev.sxpr() - d.addCallback(cbok) - return d + dev_type = sxp.name(dev_config) + dev = self.createDevice(self, dev_config, change=True) + self.config.append(['device', dev.getConfig()]) + return dev.sxpr() def device_configure(self, dev_config, idx): """Configure an existing device. @@ -906,16 +847,11 @@ class XendDomainInfo: @param idx: device index """ type = sxp.name(dev_config) - dev = self.get_device_by_index(type, idx) + dev = self.getDeviceByIndex(type, idx) if not dev: raise VmError('invalid device: %s %s' % (type, idx)) - new_config = dev.configure(dev_config, change=1) - devs = self.devices.get(type) - index = devs.index(dev) - # Patch new config into device configs. - dev_configs = self.config_devices(type) - old_config = dev_configs[index] - dev_configs[index] = new_config + old_config = dev.getConfig() + new_config = dev.configure(dev_config, change=True) # Patch new config into vm config. new_full_config = ['device', new_config] old_full_config = ['device', old_config] @@ -929,37 +865,24 @@ class XendDomainInfo: @param type: device type @param idx: device index """ - dev = self.get_device_by_index(type, idx) + dev = self.getDeviceByIndex(type, idx) if not dev: raise VmError('invalid device: %s %s' % (type, idx)) - devs = self.devices.get(type) dev.refresh() - #self.refresh_device(type, dev) - def device_destroy(self, type, idx): - """Destroy a device. + def device_delete(self, type, idx): + """Destroy and remove a device. @param type: device type @param idx: device index """ - dev = self.get_device_by_index(type, idx) + dev = self.getDeviceByIndex(type, idx) if not dev: raise VmError('invalid device: %s %s' % (type, idx)) - devs = self.devices.get(type) - index = devs.index(dev) - dev_config = self.config_device(type, index) + dev_config = dev.getConfig() if dev_config: self.config.remove(['device', dev_config]) - dev.destroy(change=1) - self.remove_device(type, dev) - - def configure_memory(self): - """Configure vm memory limit. - """ - maxmem = sxp.child_value(self.config, "maxmem") - if maxmem is None: - maxmem = self.memory - xc.domain_setmaxmem(self.dom, maxmem_kb = maxmem * 1024) + self.deleteDevice(type, dev.getId()) def configure_console(self): """Configure the vm console port. @@ -985,15 +908,15 @@ class XendDomainInfo: for the given reason. @param reason: shutdown reason - @return 1 if needs restaert, 0 otherwise + @return True if needs restart, False otherwise """ if self.restart_mode == RESTART_NEVER: - return 0 + return False if self.restart_mode == RESTART_ALWAYS: - return 1 + return True if self.restart_mode == RESTART_ONREBOOT: return reason == 'reboot' - return 0 + return False def restart_cancel(self): """Cancel a vm restart. @@ -1010,6 +933,9 @@ class XendDomainInfo: """ return self.restart_state == STATE_RESTART_PENDING + def rebooting(self): + return self.restart_state == STATE_RESTART_BOOTING + def restart_check(self): """Check if domain restart is OK. To prevent restart loops, raise an error if it is @@ -1024,20 +950,20 @@ class XendDomainInfo: log.error(msg) raise VmError(msg) self.restart_time = tnow + self.restart_count += 1 def restart(self): """Restart the domain after it has exited. Reuses the domain id and console port. - @return: deferred """ try: + self.state = STATE_VM_OK self.restart_check() self.restart_state = STATE_RESTART_BOOTING - d = self.construct(self.config) + self.construct(self.config) finally: self.restart_state = None - return d def configure_backends(self): """Set configuration flags if the vm is a backend for netif or blkif. @@ -1047,73 +973,70 @@ class XendDomainInfo: v = sxp.child0(c) name = sxp.name(v) if name == 'blkif': - self.blkif_backend = 1 + self.blkif_backend = True elif name == 'netif': - self.netif_backend = 1 + self.netif_backend = True elif name == 'usbif': - self.usbif_backend = 1 + self.usbif_backend = True else: raise VmError('invalid backend type:' + str(name)) def configure(self): """Configure a vm. - @return: deferred - calls callback with vm """ - d = self.create_devices() - d.addCallback(lambda x: self.create_blkif()) - d.addCallback(self._configure) - return d - - def _configure(self, val): - d = self.configure_fields() - def cbok(results): - return self - def cberr(err): - self.destroy() - return err - d.addCallback(cbok) - d.addErrback(cberr) - return d + self.configure_fields() + self.create_console() + self.create_devices() + self.create_blkif() + + def create_console(self): + console = self.getConsole() + if not console: + config = ['console'] + if self.console_port: + config.append(['console_port', self.console_port]) + console = self.createDevice('console', config) + return console + + def getConsole(self): + console_ctrl = self.getDeviceController("console", error=False) + if console_ctrl: + return console_ctrl.getDevice(0) + return None def create_blkif(self): """Create the block device interface (blkif) for the vm. The vm needs a blkif even if it doesn't have any disks at creation time, for example when it uses NFS root. - @return: deferred """ - if self.get_devices("vbd") == []: - ctrl = xend.blkif_create(self.dom, recreate=self.recreate) - back = ctrl.getBackendInterface(0) - return back.connect(recreate=self.recreate) - else: - return None + blkif = self.getDeviceController("blkif", error=False) + if not blkif: + blkif = self.createDeviceController("blkif") + backend = blkif.getBackend(0) + backend.connect(recreate=self.recreate) def dom_construct(self, dom, config): """Construct a vm for an existing domain. @param dom: domain id @param config: domain configuration - @return: deferred """ d = dom_get(dom) if not d: raise VmError("Domain not found: %d" % dom) try: - self.restore = 1 + self.restore = True self.setdom(dom) - #self.name = d['name'] self.memory = d['mem_kb']/1024 - deferred = self.construct(config) + self.construct(config) finally: - self.restore = 0 - return deferred + self.restore = False def configure_fields(self): """Process the vm configuration fields using the registered handlers. """ - dlist = [] index = {} for field in sxp.children(self.config): field_name = sxp.name(field) @@ -1122,13 +1045,9 @@ class XendDomainInfo: # Ignore unknown fields. Warn? if field_handler: v = field_handler(self, self.config, field, field_index) - append_deferred(dlist, v) else: log.warning("Unknown config field %s", field_name) index[field_name] = field_index + 1 - d = defer.DeferredList(dlist, fireOnOneErrback=1) - d.addErrback(dlist_err) - return d def pgtable_size(self, memory): """Return the size of memory needed for 1:1 page tables for physical @@ -1143,6 +1062,25 @@ class XendDomainInfo: return (1 + ((memory + 3) >> 2)) * 4 return 0 + def mem_target_set(self, target): + """Set domain memory target in pages. + """ + if self.channel: + msg = messages.packMsg('mem_request_t', { 'target' : target * (1 << 8)} ) + self.channel.writeRequest(msg) + + def shutdown(self, reason, key=0): + msgtype = shutdown_messages.get(reason) + if not msgtype: + raise XendError('invalid reason:' + reason) + extra = {} + if reason == 'sysrq': + extra['key'] = key + if self.channel: + msg = messages.packMsg(msgtype, extra) + self.channel.writeRequest(msg) + + def vm_image_linux(vm, image): """Create a VM for a linux image. @@ -1175,7 +1113,6 @@ def vm_image_plan9(vm, image): returns vm """ - #todo: Same as for linux. Is that right? If so can unify them. kernel = sxp.child_value(image, "kernel") cmdline = "" ip = sxp.child_value(image, "ip", "dhcp") @@ -1188,7 +1125,6 @@ def vm_image_plan9(vm, image): if args: cmdline += " " + args ramdisk = sxp.child_value(image, "ramdisk", '') - vifs = vm.config_devices("vif") vm.create_domain("plan9", kernel, ramdisk, cmdline) return vm @@ -1219,115 +1155,6 @@ def vm_image_vmx(vm, image): vm.create_domain("vmx", kernel, ramdisk, cmdline, memmap) return vm -def vm_dev_vif(vm, val, index, change=0): - """Create a virtual network interface (vif). - - @param vm: virtual machine - @param val: vif config - @param index: vif index - @return: deferred - """ - vif = vm.next_device_index('vif') - vmac = sxp.child_value(val, "mac") - ctrl = xend.netif_create(vm.dom, recreate=vm.recreate) - log.debug("Creating vif dom=%d vif=%d mac=%s", vm.dom, vif, str(vmac)) - recreate = vm.get_device_recreate('vif', index) - defer = ctrl.attachDevice(vif, val, recreate=recreate) - def cbok(dev): - dev.vifctl('up', vmname=vm.name) - dev.setIndex(index) - vm.add_device('vif', dev) - if change: - dev.interfaceChanged() - return dev - defer.addCallback(cbok) - return defer - -def vm_dev_usb(vm, val, index): - """Attach the relevant physical ports to the domains' USB interface. - - @param vm: virtual machine - @param val: USB interface config - @param index: USB interface index - @return: deferred - """ - ctrl = xend.usbif_create(vm.dom, recreate=vm.recreate) - log.debug("Creating USB interface dom=%d", vm.dom) - defer = ctrl.attachDevice(val, recreate=vm.recreate) - def cbok(path): - vm.add_device('usb', val[1][1]) - return path - defer.addCallback(cbok) - return defer - -def vm_dev_vbd(vm, val, index, change=0): - """Create a virtual block device (vbd). - - @param vm: virtual machine - @param val: vbd config - @param index: vbd index - @return: deferred - """ - idx = vm.next_device_index('vbd') - uname = sxp.child_value(val, 'uname') - log.debug("Creating vbd dom=%d uname=%s", vm.dom, uname) - ctrl = xend.blkif_create(vm.dom, recreate=vm.recreate) - recreate = vm.get_device_recreate('vbd', index) - defer = ctrl.attachDevice(idx, val, recreate=recreate) - def cbok(dev): - dev.setIndex(index) - vm.add_device('vbd', dev) - if change: - dev.interfaceChanged() - return dev - defer.addCallback(cbok) - return defer - -def parse_pci(val): - """Parse a pci field. - """ - if isinstance(val, types.StringType): - radix = 10 - if val.startswith('0x') or val.startswith('0X'): - radix = 16 - v = int(val, radix) - else: - v = val - return v - -def vm_dev_pci(vm, val, index, change=0): - """Add a pci device. - - @param vm: virtual machine - @param val: device configuration - @param index: device index - @return: 0 on success - """ - bus = sxp.child_value(val, 'bus') - if not bus: - raise VmError('pci: Missing bus') - dev = sxp.child_value(val, 'dev') - if not dev: - raise VmError('pci: Missing dev') - func = sxp.child_value(val, 'func') - if not func: - raise VmError('pci: Missing func') - try: - bus = parse_pci(bus) - dev = parse_pci(dev) - func = parse_pci(func) - except: - raise VmError('pci: invalid parameter') - log.debug("Creating pci device dom=%d bus=%x dev=%x func=%x", vm.dom, bus, dev, func) - rc = xc.physdev_pci_access_modify(dom=vm.dom, bus=bus, dev=dev, - func=func, enable=1) - if rc < 0: - #todo non-fatal - raise VmError('pci: Failed to configure device: bus=%s dev=%s func=%s' % - (bus, dev, func)) - return rc - - def vm_field_ignore(vm, config, val, index): """Dummy config field handler used for fields with built-in handling. @@ -1355,16 +1182,11 @@ def vm_field_maxmem(vm, config, val, index): raise VmError("invalid maxmem: " + str(maxmem)) xc.domain_setmaxmem(vm.dom, maxmem_kb = maxmem * 1024) +#============================================================================ # Register image handlers. -add_image_handler('linux', vm_image_linux) -add_image_handler('plan9', vm_image_plan9) -add_image_handler('vmx', vm_image_vmx) - -# Register device handlers. -add_device_handler('vif', vm_dev_vif) -add_device_handler('vbd', vm_dev_vbd) -add_device_handler('pci', vm_dev_pci) -add_device_handler('usb', vm_dev_usb) +add_image_handler('linux', vm_image_linux) +add_image_handler('plan9', vm_image_plan9) +add_image_handler('vmx', vm_image_vmx) # Ignore the fields we already handle. add_config_handler('name', vm_field_ignore) @@ -1380,3 +1202,27 @@ add_config_handler('vcpus', vm_field_ignore) # Register other config handlers. add_config_handler('maxmem', vm_field_maxmem) + +#============================================================================ +# Register device controllers and their device config types. + +from server import console +controller.addDevControllerClass("console", console.ConsoleController) + +from server import blkif +controller.addDevControllerClass("blkif", blkif.BlkifController) +add_device_handler("vbd", "blkif") + +from server import netif +controller.addDevControllerClass("netif", netif.NetifController) +add_device_handler("vif", "netif") + +from server import pciif +controller.addDevControllerClass("pciif", pciif.PciController) +add_device_handler("pci", "pciif") + +from xen.xend.server import usbif +controller.addDevControllerClass("usbif", usbif.UsbifController) +add_device_handler("usb", "usbif") + +#============================================================================ diff --git a/tools/python/xen/xend/scheduler.py b/tools/python/xen/xend/scheduler.py new file mode 100644 index 0000000000..f60ebab25b --- /dev/null +++ b/tools/python/xen/xend/scheduler.py @@ -0,0 +1,41 @@ +import threading + +class Scheduler: + + def __init__(self): + self.lock = threading.Lock() + self.schedule = {} + + def later(self, _delay, _name, _fn, args): + """Schedule a function to be called later (if not already scheduled). + + @param _delay: delay in seconds + @param _name: schedule name + @param _fn: function + @param args: arguments + """ + try: + self.lock.acquire() + if self.schedule.get(_name): return + timer = threading.Timer(_delay, _fn, args=args) + self.schedule[_name] = timer + finally: + self.lock.release() + timer.start() + + def cancel(self, name): + """Cancel a scheduled function call. + + @param name: schedule name to cancel + """ + try: + self.lock.acquire() + timer = self.schedule.get(name) + if not timer: + return + del self.schedule[name] + finally: + self.lock.release() + timer.cancel() + + diff --git a/tools/python/xen/xend/server/SrvDaemon.py b/tools/python/xen/xend/server/SrvDaemon.py index 2ca8f38f99..c63abbb9cb 100644 --- a/tools/python/xen/xend/server/SrvDaemon.py +++ b/tools/python/xen/xend/server/SrvDaemon.py @@ -39,294 +39,13 @@ from xen.xend.XendLogging import log from xen.util.ip import _readline, _readlines import channel -import blkif -import netif -import usbif -import console -import domain +import controller +import event from params import * -DAEMONIZE = 1 +DAEMONIZE = 0 DEBUG = 1 -class NotifierProtocol(protocol.Protocol): - """Asynchronous handler for i/o on the notifier (event channel). - """ - - def __init__(self, channelFactory): - self.channelFactory = channelFactory - - def notificationReceived(self, idx): - channel = self.channelFactory.getChannel(idx) - if channel: - channel.notificationReceived() - - def connectionLost(self, reason=None): - pass - - def doStart(self): - pass - - def doStop(self): - pass - - def startProtocol(self): - pass - - def stopProtocol(self): - pass - -class NotifierPort(abstract.FileDescriptor): - """Transport class for the event channel. - """ - - def __init__(self, daemon, notifier, proto, reactor=None): - assert isinstance(proto, NotifierProtocol) - abstract.FileDescriptor.__init__(self, reactor) - self.daemon = daemon - self.notifier = notifier - self.protocol = proto - - def startListening(self): - self._bindNotifier() - self._connectToProtocol() - - def stopListening(self): - if self.connected: - result = self.d = defer.Deferred() - else: - result = None - self.loseConnection() - return result - - def fileno(self): - return self.notifier.fileno() - - def _bindNotifier(self): - self.connected = 1 - - def _connectToProtocol(self): - self.protocol.makeConnection(self) - self.startReading() - - def loseConnection(self): - if self.connected: - self.stopReading() - self.disconnecting = 1 - reactor.callLater(0, self.connectionLost) - - def connectionLost(self, reason=None): - abstract.FileDescriptor.connectionLost(self, reason) - if hasattr(self, 'protocol'): - self.protocol.doStop() - self.connected = 0 - #self.notifier.close() # (this said:) Not implemented. - #os.close(self.fileno()) # But yes it is... - del self.notifier # ...as _dealloc! - if hasattr(self, 'd'): - self.d.callback(None) - del self.d - - def doRead(self): - count = 0 - while 1: - notification = self.notifier.read() - if not notification: - break - self.protocol.notificationReceived(notification) - self.notifier.unmask(notification) - count += 1 - -class EventProtocol(protocol.Protocol): - """Asynchronous handler for a connected event socket. - """ - - def __init__(self, daemon): - #protocol.Protocol.__init__(self) - self.daemon = daemon - # Event queue. - self.queue = [] - # Subscribed events. - self.events = [] - self.parser = sxp.Parser() - self.pretty = 0 - - # For debugging subscribe to everything and make output pretty. - self.subscribe(['*']) - self.pretty = 1 - - def dataReceived(self, data): - try: - self.parser.input(data) - if self.parser.ready(): - val = self.parser.get_val() - res = self.dispatch(val) - self.send_result(res) - if self.parser.at_eof(): - self.loseConnection() - except SystemExit: - raise - except: - if DEBUG: - raise - else: - self.send_error() - - def loseConnection(self): - if self.transport: - self.transport.loseConnection() - if self.connected: - reactor.callLater(0, self.connectionLost) - - def connectionLost(self, reason=None): - self.unsubscribe() - - def send_reply(self, sxpr): - io = StringIO.StringIO() - if self.pretty: - PrettyPrint.prettyprint(sxpr, out=io) - else: - sxp.show(sxpr, out=io) - print >> io - io.seek(0) - return self.transport.write(io.getvalue()) - - def send_result(self, res): - return self.send_reply(['ok', res]) - - def send_error(self): - (extype, exval) = sys.exc_info()[:2] - return self.send_reply(['err', - ['type', str(extype)], - ['value', str(exval)]]) - - def send_event(self, val): - return self.send_reply(['event', val[0], val[1]]) - - def unsubscribe(self): - for event in self.events: - eserver.unsubscribe(event, self.queue_event) - - def subscribe(self, events): - self.unsubscribe() - for event in events: - eserver.subscribe(event, self.queue_event) - self.events = events - - def queue_event(self, name, v): - # Despite the name we don't queue the event here. - # We send it because the transport will queue it. - self.send_event([name, v]) - - def opname(self, name): - return 'op_' + name.replace('.', '_') - - def operror(self, name, req): - raise XendError('Invalid operation: ' +name) - - def dispatch(self, req): - op_name = sxp.name(req) - op_method_name = self.opname(op_name) - op_method = getattr(self, op_method_name, self.operror) - return op_method(op_name, req) - - def op_help(self, name, req): - def nameop(x): - if x.startswith('op_'): - return x[3:].replace('_', '.') - else: - return x - - l = [ nameop(k) for k in dir(self) if k.startswith('op_') ] - return l - - def op_quit(self, name, req): - self.loseConnection() - - def op_exit(self, name, req): - sys.exit(0) - - def op_pretty(self, name, req): - self.pretty = 1 - return ['ok'] - - def op_console_disconnect(self, name, req): - id = sxp.child_value(req, 'id') - if not id: - raise XendError('Missing console id') - id = int(id) - self.daemon.console_disconnect(id) - return ['ok'] - - def op_info(self, name, req): - val = ['info'] - val += self.daemon.consoles() - val += self.daemon.blkifs() - val += self.daemon.netifs() - val += self.daemon.usbifs() - return val - - def op_sys_subscribe(self, name, v): - # (sys.subscribe event*) - # Subscribe to the events: - self.subscribe(v[1:]) - return ['ok'] - - def op_sys_inject(self, name, v): - # (sys.inject event) - event = v[1] - eserver.inject(sxp.name(event), event) - return ['ok'] - - def op_trace(self, name, v): - mode = (v[1] == 'on') - self.daemon.tracing(mode) - - def op_log_stderr(self, name, v): - mode = v[1] - logging = XendRoot.instance().get_logging() - if mode == 'on': - logging.addLogStderr() - else: - logging.removeLogStderr() - - def op_debug_msg(self, name, v): - mode = v[1] - import messages - messages.DEBUG = (mode == 'on') - - def op_debug_controller(self, name, v): - mode = v[1] - import controller - controller.DEBUG = (mode == 'on') - - -class EventFactory(protocol.Factory): - """Asynchronous handler for the event server socket. - """ - protocol = EventProtocol - service = None - - def __init__(self, daemon): - #protocol.Factory.__init__(self) - self.daemon = daemon - - def buildProtocol(self, addr): - proto = self.protocol(self.daemon) - proto.factory = self - return proto - -class VirqClient: - def __init__(self, daemon): - self.daemon = daemon - - def virqReceived(self, virq): - print 'VirqClient.virqReceived>', virq - eserver.inject('xend.virq', virq) - - def lostChannel(self, channel): - print 'VirqClient.lostChannel>', channel - class Daemon: """The xend daemon. """ @@ -469,6 +188,7 @@ class Daemon: pass else: # Child + self.daemonize() os.execl("/usr/sbin/xfrd", "xfrd") def daemonize(self): @@ -504,8 +224,6 @@ class Daemon: xfrd_pid = self.cleanup_xfrd() - self.daemonize() - if self.set_user(): return 4 os.chdir("/") @@ -608,146 +326,43 @@ class Daemon: return self.cleanup(kill=True) def run(self): - xroot = XendRoot.instance() - log.info("Xend Daemon started") - self.createFactories() - self.listenEvent(xroot) - self.listenNotifier() - self.listenVirq() - SrvServer.create(bridge=1) - reactor.run() + try: + xroot = XendRoot.instance() + log.info("Xend Daemon started") + self.createFactories() + self.listenEvent(xroot) + self.listenVirq() + self.listenChannels() + SrvServer.create(bridge=1) + self.daemonize() + reactor.run() + except Exception, ex: + print >>sys.stderr, 'Exception starting xend:', ex + self.exit(1) + def createFactories(self): self.channelF = channel.channelFactory() - self.domainCF = domain.DomainControllerFactory() - self.blkifCF = blkif.BlkifControllerFactory() - self.netifCF = netif.NetifControllerFactory() - self.usbifCF = usbif.UsbifControllerFactory() - self.consoleCF = console.ConsoleControllerFactory() def listenEvent(self, xroot): - protocol = EventFactory(self) port = xroot.get_xend_event_port() interface = xroot.get_xend_address() - return reactor.listenTCP(port, protocol, interface=interface) + return event.listenEvent(self, port, interface) - def listenNotifier(self): - protocol = NotifierProtocol(self.channelF) - p = NotifierPort(self, self.channelF.notifier, protocol, reactor) - p.startListening() - return p + def listenChannels(self): + self.channelF.start() def listenVirq(self): - virqChan = self.channelF.virqChannel(channel.VIRQ_DOM_EXC) - virqChan.registerClient(VirqClient(self)) + def virqReceived(virq): + print 'virqReceived>', virq + eserver.inject('xend.virq', virq) + self.channelF.setVirqHandler(virqReceived) - def exit(self): + def exit(self, rc=0): reactor.disconnectAll() - sys.exit(0) - - def getDomChannel(self, dom): - """Get the channel to a domain. - - @param dom: domain - @return: channel (or None) - """ - return self.channelF.getDomChannel(dom) - - def createDomChannel(self, dom, local_port=0, remote_port=0): - """Get the channel to a domain, creating if necessary. - - @param dom: domain - @param local_port: optional local port to re-use - @param remote_port: optional remote port to re-use - @return: channel - """ - return self.channelF.domChannel(dom, local_port=local_port, - remote_port=remote_port) - - def blkif_create(self, dom, recreate=0): - """Create or get a block device interface controller. - - Returns controller - """ - blkif = self.blkifCF.getController(dom) - blkif.daemon = self - return blkif - - def blkifs(self): - return [ x.sxpr() for x in self.blkifCF.getControllers() ] - - def blkif_get(self, dom): - return self.blkifCF.getControllerByDom(dom) - - def netif_create(self, dom, recreate=0): - """Create or get a network interface controller. - - """ - return self.netifCF.getController(dom) + self.channelF.stop() + sys.exit(rc) - def netifs(self): - return [ x.sxpr() for x in self.netifCF.getControllers() ] - - def netif_get(self, dom): - return self.netifCF.getControllerByDom(dom) - - def usbif_create(self, dom, recreate=0): - return self.usbifCF.getController(dom) - - def usbifs(self): - return [ x.sxpr() for x in self.usbifCF.getControllers() ] - - def usbif_get(self, dom): - return self.usbifCF.getControllerByDom(dom) - - def console_create(self, dom, console_port=None): - """Create a console for a domain. - """ - console = self.consoleCF.getControllerByDom(dom) - if console is None: - console = self.consoleCF.createController(dom, console_port) - return console - - def consoles(self): - return [ c.sxpr() for c in self.consoleCF.getControllers() ] - - def get_consoles(self): - return self.consoleCF.getControllers() - - def get_console(self, id): - return self.consoleCF.getControllerByIndex(id) - - def get_domain_console(self, dom): - return self.consoleCF.getControllerByDom(dom) - - def console_disconnect(self, id): - """Disconnect any connected console client. - """ - console = self.get_console(id) - if not console: - raise XendError('Invalid console id') - console.disconnect() - - def domain_shutdown(self, dom, reason, key=0): - """Shutdown a domain. - """ - dom = int(dom) - ctrl = self.domainCF.getController(dom) - if not ctrl: - raise XendError('No domain controller: %s' % dom) - ctrl.shutdown(reason, key) - return 0 - - def domain_mem_target_set(self, dom, target): - """Set memory target for a domain. - """ - dom = int(dom) - ctrl = self.domainCF.getController(dom) - if not ctrl: - raise XendError('No domain controller: %s' % dom) - ctrl.mem_target_set(target) - return 0 - def instance(): global inst try: diff --git a/tools/python/xen/xend/server/SrvDomainDir.py b/tools/python/xen/xend/server/SrvDomainDir.py index 2fc8ee4877..59c4322777 100644 --- a/tools/python/xen/xend/server/SrvDomainDir.py +++ b/tools/python/xen/xend/server/SrvDomainDir.py @@ -62,9 +62,8 @@ class SrvDomainDir(SrvDir): if not ok: raise XendError(errmsg) try: - deferred = self.xd.domain_create(config) - deferred.addCallback(self._op_create_cb, configstring, req) - return deferred + dominfo = self.xd.domain_create(config) + return self._op_create_cb(dominfo, configstring, req) except Exception, ex: print 'op_create> Exception creating domain:' traceback.print_exc() @@ -97,9 +96,8 @@ class SrvDomainDir(SrvDir): """ fn = FormFn(self.xd.domain_restore, [['file', 'str']]) - deferred = fn(req.args) - deferred.addCallback(self._op_restore_cb, req) - return deferred + dominfo = fn(req.args) + return self._op_restore_cb(dominfo, req) def _op_restore_cb(self, dominfo, req): dom = dominfo.name diff --git a/tools/python/xen/xend/server/blkif.py b/tools/python/xen/xend/server/blkif.py index 2974f070b0..f5401f7573 100755 --- a/tools/python/xen/xend/server/blkif.py +++ b/tools/python/xen/xend/server/blkif.py @@ -2,54 +2,29 @@ """Support for virtual block devices. """ -from twisted.internet import defer +import os +import re +import string +from xen.xend.XendError import XendError, VmError +from xen.xend import XendRoot +from xen.xend.XendLogging import log from xen.xend import sxp from xen.xend import Blkctl -from xen.xend.XendLogging import log -from xen.xend.XendError import XendError, VmError -import os -import re -import string import channel -import controller +from controller import CtrlMsgRcvr, Dev, DevController from messages import * from xen.util.ip import _readline, _readlines def expand_dev_name(name): + if not name: + return name if re.match( '^/dev/', name ): - return name + return name else: - return '/dev/' + name - -def check_mounted(self, name): - mode = None - name = expand_dev_name(name) - lines = _readlines(os.popen('mount 2>/dev/null')) - exp = re.compile('^' + name + ' .*[\(,]r(?P[ow])[,\)]') - for line in lines: - pm = exp.match(line) - if not pm: continue - mode = pm.group('mode') - break - if mode is 'w': - return mode - if mode is 'o': - mode = 'r' - blkifs = self.ctrl.daemon.blkifs() - for blkif in blkifs: - if blkif[1][1] is self.ctrl.dom: - continue - for dev in self.ctrl.daemon.blkif_get(blkif[1][1]).getDevices(): - if dev.type == 'phy' and name == expand_dev_name(dev.params): - mode = dev.mode - if 'w' in mode: - return 'w' - if mode and 'r' in mode: - return 'r' - return None + return '/dev/' + name def blkdev_name_to_number(name): """Take the given textual block-device name (e.g., '/dev/sda1', @@ -58,10 +33,10 @@ def blkdev_name_to_number(name): n = expand_dev_name(name) try: - return os.stat(n).st_rdev + return os.stat(n).st_rdev except Exception, ex: log.debug("exception looking up device number for %s: %s", name, ex) - pass + pass if re.match( '/dev/sd[a-p]([0-9]|1[0-5])', n): return 8 * 256 + 16 * (ord(n[7:8]) - ord('a')) + int(n[8:]) @@ -74,8 +49,8 @@ def blkdev_name_to_number(name): # see if this is a hex device number if re.match( '^(0x)?[0-9a-fA-F]+$', name ): - return string.atoi(name,16) - + return string.atoi(name,16) + return None def blkdev_segment(name): @@ -90,50 +65,70 @@ def blkdev_segment(name): val = None n = blkdev_name_to_number(name) if n: - val = { 'device' : n, + val = { 'device' : n, 'start_sector' : long(0), - 'nr_sectors' : long(1L<<63), - 'type' : 'Disk' } + 'nr_sectors' : long(1L<<63), + 'type' : 'Disk' } return val -class BlkifBackendController(controller.BackendController): - """ Handler for the 'back-end' channel to a block device driver domain. - """ - - def __init__(self, factory, dom): - controller.BackendController.__init__(self, factory, dom) - self.addMethod(CMSG_BLKIF_BE, - CMSG_BLKIF_BE_DRIVER_STATUS, - self.recv_be_driver_status) - self.registerChannel() - - def recv_be_driver_status(self, msg, req): - """Request handler for be_driver_status messages. - - @param msg: message - @type msg: xu message - @param req: request flag (true if the msg is a request) - @type req: bool - """ - val = unpackMsg('blkif_be_driver_status_t', msg) - status = val['status'] - -class BlkifBackendInterface(controller.BackendInterface): +def mount_mode(name): + mode = None + name = expand_dev_name(name) + lines = _readlines(os.popen('mount 2>/dev/null')) + exp = re.compile('^' + name + ' .*[\(,]r(?P[ow])[,\)]') + for line in lines: + pm = exp.match(line) + if not pm: continue + mode = pm.group('mode') + break + if mode == 'w': + return mode + if mode == 'o': + mode = 'r' + return mode + +class BlkifBackend: """ Handler for the 'back-end' channel to a block device driver domain on behalf of a front-end domain. Must be connected using connect() before it can be used. - Do not create directly - use getBackendInterface() on the BlkifController. """ - def __init__(self, ctrl, dom, handle): - controller.BackendInterface.__init__(self, ctrl, dom, handle) - self.connected = 0 + def __init__(self, controller, id, dom, recreate=False): + self.controller = controller + self.id = id + self.frontendDomain = self.controller.getDomain() + self.frontendChannel = None + self.backendDomain = dom + self.backendChannel = None + self.destroyed = False + self.connected = False self.evtchn = None self.status = BLKIF_INTERFACE_STATUS_DISCONNECTED + def init(self, recreate=False, reboot=False): + self.destroyed = False + self.frontendDomain = self.controller.getDomain() + self.frontendChannel = self.controller.getChannel() + cf = channel.channelFactory() + self.backendChannel = cf.openChannel(self.backendDomain) + def __str__(self): - return '' % (self.controller.dom, self.dom) + return ('' + % (self.frontendDomain, + self.backendDomain, + self.id)) + def getId(self): + return self.id + + def closeEvtchn(self): + if self.evtchn: + channel.eventChannelClose(self.evtchn) + self.evtchn = None + + def openEvtchn(self): + self.evtchn = channel.eventChannel(self.backendDomain, self.frontendDomain) + def getEventChannelBackend(self): val = 0 if self.evtchn: @@ -146,91 +141,76 @@ class BlkifBackendInterface(controller.BackendInterface): val = self.evtchn['port2'] return val - def connect(self, recreate=0): + def connect(self, recreate=False): """Connect to the blkif control interface. @param recreate: true if after xend restart - @return: deferred """ log.debug("Connecting blkif %s", str(self)) if recreate or self.connected: - d = defer.succeed(self) + self.connected = True + pass else: - d = self.send_be_create() - d.addCallback(self.respond_be_create) - return d + self.send_be_create() def send_be_create(self): - d = defer.Deferred() + log.debug("send_be_create %s", str(self)) msg = packMsg('blkif_be_create_t', - { 'domid' : self.controller.dom, - 'blkif_handle' : self.handle }) - self.writeRequest(msg, response=d) - return d - - def respond_be_create(self, msg): - val = unpackMsg('blkif_be_create_t', msg) - self.connected = 1 - return self - - def destroy(self): + { 'domid' : self.frontendDomain, + 'blkif_handle' : self.id }) + msg = self.backendChannel.requestResponse(msg) + #todo: check return status + self.connected = True + + def destroy(self, change=False, reboot=False): """Disconnect from the blkif control interface and destroy it. """ - def cb_destroy(val): - self.send_be_destroy() - self.close() - d = defer.Deferred() - d.addCallback(cb_destroy) - if self.evtchn: - channel.eventChannelClose(self.evtchn) - self.send_be_disconnect(response=d) - - def send_be_disconnect(self, response=None): + self.send_be_disconnect() + self.send_be_destroy() + self.closeEvtchn() + self.destroyed = True + # For change true need to notify front-end, or back-end will do it? + + def send_be_disconnect(self): msg = packMsg('blkif_be_disconnect_t', - { 'domid' : self.controller.dom, - 'blkif_handle' : self.handle }) - self.writeRequest(msg, response=response) + { 'domid' : self.frontendDomain, + 'blkif_handle' : self.id }) + self.backendChannel.writeRequest(msg) + self.connected = False - def send_be_destroy(self, response=None): + def send_be_destroy(self): msg = packMsg('blkif_be_destroy_t', - { 'domid' : self.controller.dom, - 'blkif_handle' : self.handle }) - self.writeRequest(msg, response=response) + { 'domid' : self.frontendDomain, + 'blkif_handle' : self.id }) + self.backendChannel.writeRequest(msg) def connectInterface(self, val): - self.evtchn = channel.eventChannel(self.dom, self.controller.dom) + self.openEvtchn() log.debug("Connecting blkif to event channel %s ports=%d:%d", str(self), self.evtchn['port1'], self.evtchn['port2']) msg = packMsg('blkif_be_connect_t', - { 'domid' : self.controller.dom, - 'blkif_handle' : self.handle, + { 'domid' : self.frontendDomain, + 'blkif_handle' : self.id, 'evtchn' : self.getEventChannelBackend(), 'shmem_frame' : val['shmem_frame'] }) - d = defer.Deferred() - d.addCallback(self.respond_be_connect) - self.writeRequest(msg, response=d) - - def respond_be_connect(self, msg): - """Response handler for a be_connect message. - - @param msg: message - @type msg: xu message - """ + msg = self.backendChannel.requestResponse(msg) + #todo: check return status val = unpackMsg('blkif_be_connect_t', msg) self.status = BLKIF_INTERFACE_STATUS_CONNECTED self.send_fe_interface_status() - def send_fe_interface_status(self, response=None): + def send_fe_interface_status(self): msg = packMsg('blkif_fe_interface_status_t', - { 'handle' : self.handle, + { 'handle' : self.id, 'status' : self.status, - 'domid' : self.dom, + 'domid' : self.backendDomain, 'evtchn' : self.getEventChannelFrontend() }) - self.controller.writeRequest(msg, response=response) + self.frontendChannel.writeRequest(msg) def interfaceDisconnected(self): self.status = BLKIF_INTERFACE_STATUS_DISCONNECTED - #todo?: Do this: self.evtchn = None + #todo?: Close evtchn: + #self.closeEvtchn() self.send_fe_interface_status() def interfaceChanged(self): @@ -238,83 +218,18 @@ class BlkifBackendInterface(controller.BackendInterface): The front-end should then probe for devices. """ msg = packMsg('blkif_fe_interface_status_t', - { 'handle' : self.handle, + { 'handle' : self.id, 'status' : BLKIF_INTERFACE_STATUS_CHANGED, - 'domid' : self.dom, + 'domid' : self.backendDomain, 'evtchn' : 0 }) - self.controller.writeRequest(msg) - -class BlkifControllerFactory(controller.SplitControllerFactory): - """Factory for creating block device interface controllers. - """ + self.frontendChannel.writeRequest(msg) - def __init__(self): - controller.SplitControllerFactory.__init__(self) - - def createController(self, dom, recreate=0): - """Create a block device controller for a domain. - - @param dom: domain - @type dom: int - @param recreate: if true it's a recreate (after xend restart) - @type recreate: bool - @return: block device controller - @rtype: BlkifController - """ - blkif = self.getControllerByDom(dom) - if blkif is None: - blkif = BlkifController(self, dom) - self.addController(blkif) - return blkif - - def createBackendController(self, dom): - """Create a block device backend controller. - - @param dom: backend domain - @return: backend controller - """ - return BlkifBackendController(self, dom) - - def createBackendInterface(self, ctrl, dom, handle): - """Create a block device backend interface. - - @param ctrl: controller - @param dom: backend domain - @param handle: interface handle - @return: backend interface - """ - return BlkifBackendInterface(ctrl, dom, handle) - - def getDomainDevices(self, dom): - """Get the block devices for a domain. - - @param dom: domain - @type dom: int - @return: devices - @rtype: [device] - """ - blkif = self.getControllerByDom(dom) - return (blkif and blkif.getDevices()) or [] - - def getDomainDevice(self, dom, idx): - """Get a block device from a domain. - - @param dom: domain - @type dom: int - @param idx: device index - @type idx: int - @return: device - @rtype: device - """ - blkif = self.getControllerByDom(dom) - return (blkif and blkif.getDevice(idx)) or None - -class BlkDev(controller.SplitDev): +class BlkDev(Dev): """Info record for a block device. """ - def __init__(self, idx, ctrl, config): - controller.SplitDev.__init__(self, idx, ctrl) + def __init__(self, controller, id, config, recreate=False): + Dev.__init__(self, controller, id, config, recreate=recreate) self.dev = None self.uname = None self.vdev = None @@ -325,10 +240,27 @@ class BlkDev(controller.SplitDev): self.device = None self.start_sector = None self.nr_sectors = None - self.ctrl = ctrl - self.configure(config) - - def configure(self, config): + + self.frontendDomain = self.getDomain() + self.frontendChannel = None + self.backendDomain = None + self.backendChannel = None + self.backendId = 0 + self.configure(self.config, recreate=recreate) + + def init(self, recreate=False, reboot=False): + print 'BlkDev>init>' + self.frontendDomain = self.getDomain() + self.frontendChannel = self.getChannel() + backend = self.getBackend() + self.backendChannel = backend.backendChannel + self.backendId = backend.id + print 'BlkDev>init<' + + def configure(self, config, change=False, recreate=False): + print 'BlkDev>configure>' + if change: + raise XendError("cannot reconfigure vbd") self.config = config self.uname = sxp.child_value(config, 'uname') if not self.uname: @@ -340,23 +272,33 @@ class BlkDev(controller.SplitDev): if not self.dev: raise VmError('vbd: Missing dev') self.mode = sxp.child_value(config, 'mode', 'r') - # todo: The 'dev' should be looked up in the context of the domain. + self.vdev = blkdev_name_to_number(self.dev) if not self.vdev: raise VmError('vbd: Device not found: %s' % self.dev) + try: self.backendDomain = int(sxp.child_value(config, 'backend', '0')) except: raise XendError('invalid backend domain') - def recreate(self, savedinfo): - node = sxp.child_value(savedinfo, 'node') - self.setNode(node) + print 'BlkDev>configure<' + return self.config - def attach(self): - node = Blkctl.block('bind', self.type, self.params) - self.setNode(node) - return self.attachBackend() + def attach(self, recreate=False, change=False): + print 'BlkDev>attach>', self + if recreate: + print 'attach>', 'recreate=', recreate + node = sxp.child_value(recreate, 'node') + print 'attach>', 'node=', node + self.setNode(node) + else: + node = Blkctl.block('bind', self.type, self.params) + self.setNode(node) + self.attachBackend() + if change: + self.interfaceChanged() + print 'BlkDev>attach<', self def unbind(self): if self.node is None: return @@ -379,14 +321,15 @@ class BlkDev(controller.SplitDev): return # done. - mounted_mode = check_mounted(self, node) + mounted_mode = self.check_mounted(node) if not '!' in self.mode and mounted_mode: - if mounted_mode is "w": + if mounted_mode == "w": raise VmError("vbd: Segment %s is in writable use" % self.uname) elif 'w' in self.mode: raise VmError("vbd: Segment %s is in read-only use" % self.uname) + segment = blkdev_segment(node) if not segment: raise VmError("vbd: Segment not found: uname=%s" % self.uname) @@ -395,12 +338,28 @@ class BlkDev(controller.SplitDev): self.start_sector = segment['start_sector'] self.nr_sectors = segment['nr_sectors'] + def check_mounted(self, name): + mode = mount_mode(name) + xd = XendRoot.get_component('xen.xend.XendDomain') + for vm in xd.domains(): + ctrl = vm.getDeviceController(self.getType(), error=False) + if (not ctrl): continue + for dev in ctrl.getDevices(): + if dev is self: continue + if dev.type == 'phy' and name == expand_dev_name(dev.params): + mode = dev.mode + if 'w' in mode: + return 'w' + if mode and 'r' in mode: + return 'r' + return None + def readonly(self): return 'w' not in self.mode def sxpr(self): val = ['vbd', - ['idx', self.idx], + ['id', self.id], ['vdev', self.vdev], ['device', self.device], ['mode', self.mode]] @@ -410,163 +369,162 @@ class BlkDev(controller.SplitDev): val.append(['uname', self.uname]) if self.node: val.append(['node', self.node]) - if self.index is not None: - val.append(['index', self.index]) + val.append(['index', self.getIndex()]) return val + def getBackend(self): + return self.controller.getBackend(self.backendDomain) + def refresh(self): - log.debug("Refreshing vbd domain=%d idx=%s", self.controller.dom, self.idx) + log.debug("Refreshing vbd domain=%d id=%s", self.frontendDomain, self.id) self.interfaceChanged() - def destroy(self, change=0): + def destroy(self, change=False, reboot=False): """Destroy the device. If 'change' is true notify the front-end interface. @param change: change flag """ - log.debug("Destroying vbd domain=%d idx=%s", self.controller.dom, self.idx) - d = self.send_be_vbd_destroy() + self.destroyed = True + log.debug("Destroying vbd domain=%d id=%s", self.frontendDomain, self.id) + self.send_be_vbd_destroy() if change: - d.addCallback(lambda val: self.interfaceChanged()) - d.addCallback(lambda val: self.unbind()) + self.interfaceChanged() + self.unbind() def interfaceChanged(self): """Tell the back-end to notify the front-end that a device has been added or removed. """ - self.getBackendInterface().interfaceChanged() + self.getBackend().interfaceChanged() def attachBackend(self): """Attach the device to its controller. """ - backend = self.getBackendInterface() - d1 = backend.connect() - d2 = defer.Deferred() - d2.addCallback(self.send_be_vbd_create) - d1.chainDeferred(d2) - return d2 + print 'BlkDev>attachBackend>' + self.getBackend().connect() + self.send_be_vbd_create() + print 'BlkDev>attachBackend<' - def send_be_vbd_create(self, val): - d = defer.Deferred() - d.addCallback(self.respond_be_vbd_create) - backend = self.getBackendInterface() + def send_be_vbd_create(self): + print 'BlkDev>send_be_vbd_create>' msg = packMsg('blkif_be_vbd_create_t', - { 'domid' : self.controller.dom, - 'blkif_handle' : backend.handle, + { 'domid' : self.frontendDomain, + 'blkif_handle' : self.backendId, 'pdevice' : self.device, 'vdevice' : self.vdev, 'readonly' : self.readonly() }) - backend.writeRequest(msg, response=d) - return d + msg = self.backendChannel.requestResponse(msg) - def respond_be_vbd_create(self, msg): val = unpackMsg('blkif_be_vbd_create_t', msg) - status = val['status'] - if status != BLKIF_BE_STATUS_OKAY: + status = val['status'] + if status != BLKIF_BE_STATUS_OKAY: raise XendError("Creating vbd failed: device %s, error %d" % (sxp.to_string(self.config), status)) - return self def send_be_vbd_destroy(self): - d = defer.Deferred() - backend = self.getBackendInterface() msg = packMsg('blkif_be_vbd_destroy_t', - { 'domid' : self.controller.dom, - 'blkif_handle' : backend.handle, + { 'domid' : self.frontendDomain, + 'blkif_handle' : self.backendId, 'vdevice' : self.vdev }) - self.controller.delDevice(self.vdev) - backend.writeRequest(msg, response=d) - return d + return self.backendChannel.writeRequest(msg) - -class BlkifController(controller.SplitController): +class BlkifController(DevController): """Block device interface controller. Handles all block devices for a domain. """ - def __init__(self, factory, dom): + def __init__(self, dctype, vm, recreate=False): """Create a block device controller. - Do not call directly - use createController() on the factory instead. """ - controller.SplitController.__init__(self, factory, dom) - self.addMethod(CMSG_BLKIF_FE, - CMSG_BLKIF_FE_DRIVER_STATUS, - self.recv_fe_driver_status) - self.addMethod(CMSG_BLKIF_FE, - CMSG_BLKIF_FE_INTERFACE_CONNECT, - self.recv_fe_interface_connect) - self.registerChannel() + DevController.__init__(self, dctype, vm, recreate=recreate) + self.backends = {} + self.backendId = 0 + self.rcvr = None + + def initController(self, recreate=False, reboot=False): + print 'BlkifController>initController>' + self.destroyed = False + # Add our handlers for incoming requests. + self.rcvr = CtrlMsgRcvr(self.getChannel()) + self.rcvr.addHandler(CMSG_BLKIF_FE, + CMSG_BLKIF_FE_DRIVER_STATUS, + self.recv_fe_driver_status) + self.rcvr.addHandler(CMSG_BLKIF_FE, + CMSG_BLKIF_FE_INTERFACE_CONNECT, + self.recv_fe_interface_connect) + self.rcvr.registerChannel() + if reboot: + self.rebootBackends() + self.rebootDevices() + print 'BlkifController>initController<' def sxpr(self): - val = ['blkif', ['dom', self.dom]] + val = ['blkif', ['dom', self.getDomain()]] return val - def addDevice(self, idx, config): - """Add a device to the device table. + def rebootBackends(self): + for backend in self.backends.values(): + backend.init(reboot=True) - @param vdev: device index - @type vdev: int - @param config: device configuration - @return: device - @rtype: BlkDev - """ - if idx in self.devices: - raise XendError('device exists: ' + str(idx)) - dev = BlkDev(idx, self, config ) - self.devices[idx] = dev - return dev + def getBackendById(self, id): + return self.backends.get(id) + + def getBackendByDomain(self, dom): + for backend in self.backends.values(): + if backend.backendDomain == dom: + return backend + return None - def attachDevice(self, idx, config, recreate=0): - """Attach a device to the specified interface. - On success the returned deferred will be called with the device. + def getBackend(self, dom): + backend = self.getBackendByDomain(dom) + if backend: return backend + backend = BlkifBackend(self, self.backendId, dom) + self.backendId += 1 + self.backends[backend.getId()] = backend + backend.init() + return backend - @param idx: device id + def newDevice(self, id, config, recreate=False): + """Create a device.. + + @param id: device id @param config: device configuration @param recreate: if true it's being recreated (after xend restart) @type recreate: bool - @return: deferred - @rtype: Deferred + @return: device + @rtype: BlkDev """ - dev = self.addDevice(idx, config) - if recreate: - dev.recreate(recreate) - d = defer.succeed(dev) - else: - d = dev.attach() - return d - - def destroy(self): + return BlkDev(self, id, config, recreate=recreate) + + def destroyController(self, reboot=False): """Destroy the controller and all devices. """ - log.debug("Destroying blkif domain=%d", self.dom) - self.destroyDevices() - self.destroyBackends() - - def destroyDevices(self): - """Destroy all devices. - """ - for dev in self.getDevices(): - dev.destroy() + self.destroyed = True + log.debug("Destroying blkif domain=%d", self.getDomain()) + self.destroyDevices(reboot=reboot) + self.destroyBackends(reboot=reboot) + self.rcvr.deregisterChannel() - def destroyBackends(self): - for backend in self.getBackendInterfaces(): - backend.destroy() + def destroyBackends(self, reboot=False): + for backend in self.backends.values(): + backend.destroy(reboot=reboot) - def recv_fe_driver_status(self, msg, req): + def recv_fe_driver_status(self, msg): val = unpackMsg('blkif_fe_driver_status_t', msg) - print 'recv_fe_driver_status>', val - for backend in self.getBackendInterfaces(): + for backend in self.backends.values(): backend.interfaceDisconnected() - def recv_fe_interface_connect(self, msg, req): + def recv_fe_interface_connect(self, msg): val = unpackMsg('blkif_fe_interface_connect_t', msg) - handle = val['handle'] - backend = self.getBackendInterfaceByHandle(handle) + id = val['handle'] + backend = self.getBackendById(id) if backend: - backend.connectInterface(val) + try: + backend.connectInterface(val) + except IOError, ex: + log.error("Exception connecting backend: %s", ex) else: - log.error('interface connect on unknown interface: handle=%d', handle) - - + log.error('interface connect on unknown interface: id=%d', id) diff --git a/tools/python/xen/xend/server/channel.py b/tools/python/xen/xend/server/channel.py index 4957a91b94..95c5e8cc3c 100755 --- a/tools/python/xen/xend/server/channel.py +++ b/tools/python/xen/xend/server/channel.py @@ -1,8 +1,12 @@ # Copyright (C) 2004 Mike Wray +import threading +import select + import xen.lowlevel.xc; xc = xen.lowlevel.xc.new() from xen.lowlevel import xu -from messages import msgTypeName, printMsg + +from messages import * VIRQ_MISDIRECT = 0 # Catch-all interrupt for unbound VIRQs. VIRQ_TIMER = 1 # Timebase update, and/or requested timeout. @@ -10,6 +14,10 @@ VIRQ_DEBUG = 2 # Request guest to dump debug info. VIRQ_CONSOLE = 3 # (DOM0) bytes received on emergency console. VIRQ_DOM_EXC = 4 # (DOM0) Exceptional event for some domain. +DEBUG = 0 + +RESPONSE_TIMEOUT = 20.0 + def eventChannel(dom1, dom2): """Create an event channel between domains. The returned dict contains dom1, dom2, port1 and port2 on success. @@ -33,6 +41,7 @@ def eventChannelClose(evtchn): pass if not evtchn: return + print 'eventChannelClose>', evtchn evtchn_close(evtchn.get('dom1'), evtchn.get('port1')) evtchn_close(evtchn.get('dom2'), evtchn.get('port2')) @@ -45,76 +54,135 @@ class ChannelFactory: """ Channels indexed by index. """ channels = {} + thread = None + + notifier = None + + """Map of ports to the virq they signal.""" + virqPorts = {} + def __init__(self): """Constructor - do not use. Use the channelFactory function.""" self.notifier = xu.notifier() + self.bind_virq(VIRQ_MISDIRECT) + self.bind_virq(VIRQ_TIMER) + self.bind_virq(VIRQ_DEBUG) + self.bind_virq(VIRQ_CONSOLE) + self.bind_virq(VIRQ_DOM_EXC) + self.virqHandler = None + + def bind_virq(self, virq): + port = self.notifier.bind_virq(virq) + self.virqPorts[port] = virq + + def virq(self): + self.notifier.virq_send(self.virqPort) + + def start(self): + """Fork a thread to read messages. + """ + if self.thread: return + self.thread = threading.Thread(name="ChannelFactory", + target=self.main) + self.thread.setDaemon(True) + self.thread.start() + + def stop(self): + """Signal the thread to stop. + """ + self.thread = None + + def main(self): + """Main routine for the thread. + """ + while True: + if self.thread == None: return + port = self.notifier.read() + if port: + virq = self.virqPorts.get(port) + if virq is not None: + self.virqReceived(virq) + else: + self.msgReceived(port) + else: + select.select([self.notifier], [], [], 1.0) + + def msgReceived(self, port): + # We run the message handlers in their own threads. + # Note we use keyword args to lambda to save the values - + # otherwise lambda will use the variables, which will get + # assigned by the loop and the lambda will get the changed values. + for chan in self.channels.values(): + if self.thread == None: return + msg = chan.readResponse() + if msg: + chan.responseReceived(msg) + for chan in self.channels.values(): + if self.thread == None: return + msg = chan.readRequest() + if msg: + self.runInThread(lambda chan=chan, msg=msg: chan.requestReceived(msg)) + + def runInThread(self, thunk): + thread = threading.Thread(target = thunk) + thread.setDaemon(True) + thread.start() + + def setVirqHandler(self, virqHandler): + self.virqHandler = virqHandler + + def virqReceived(self, virq): + if 1 or DEBUG: + print 'virqReceived>', virq + if not self.virqHandler: return + self.runInThread(lambda virq=virq: self.virqHandler(virq)) + + def newChannel(self, dom, local_port, remote_port): + """Create a new channel. + """ + return self.addChannel(Channel(self, dom, local_port, remote_port)) def addChannel(self, channel): - """Add a channel. Registers with the notifier. + """Add a channel. """ - idx = channel.idx - self.channels[idx] = channel - self.notifier.bind(idx) + self.channels[channel.getKey()] = channel + return channel - def getChannel(self, idx): - """Get the channel with the given index (if any). + def delChannel(self, channel): + """Remove the channel. """ - return self.channels.get(idx) + key = channel.getKey() + if key in self.channels: + del self.channels[key] - def delChannel(self, idx): - """Remove the channel with the given index (if any). - Deregisters with the notifier. + def getChannel(self, dom, local_port, remote_port): + """Get the channel with the given domain and ports (if any). """ - if idx in self.channels: - del self.channels[idx] - self.notifier.unbind(idx) + key = (dom, local_port, remote_port) + return self.channels.get(key) - def domChannel(self, dom, local_port=0, remote_port=0): - """Get the channel for the given domain. - Construct if necessary. + def findChannel(self, dom, local_port=0, remote_port=0): + """Find a channel. Ports given as zero are wildcards. dom domain returns channel """ - chan = self.getDomChannel(dom) - if not chan: - chan = Channel(self, dom, local_port=local_port, - remote_port=remote_port) - self.addChannel(chan) - return chan - - def getDomChannel(self, dom): - """Get the channel for the given domain. - - dom domain - - returns channel (or None) - """ - dom = int(dom) - for chan in self.channels.values(): - if not isinstance(chan, Channel): continue - if chan.dom == dom: - return chan + chan = self.getChannel(dom, local_port, remote_port) + if chan: return chan + if local_port and remote_port: + return None + for c in self.channels.values(): + if c.dom != dom: continue + if local_port and local_port != c.getLocalPort(): continue + if remote_port and remote_port != c.getRemotePort(): continue + return c return None - - def virqChannel(self, virq): - """Get the channel for the given virq. - Construct if necessary. - """ - for chan in self.channels.values(): - if not isinstance(chan, VirqChannel): continue - if chan.virq == virq: - return chan - chan = VirqChannel(self, virq) - self.addChannel(chan) - return chan - - def channelClosed(self, channel): - """The given channel has been closed - remove it. - """ - self.delChannel(channel.idx) + def openChannel(self, dom, local_port=0, remote_port=0): + return (self.findChannel(dom, local_port=local_port, remote_port=remote_port) + or + self.newChannel(dom, local_port, remote_port)) def createPort(self, dom, local_port=0, remote_port=0): """Create a port for a channel to the given domain. @@ -147,122 +215,54 @@ def channelFactory(): inst = ChannelFactory() return inst -class BaseChannel: - """Abstract superclass for channels. - - The subclass constructor must set idx to the port to use. - """ - - def __init__(self, factory): - self.factory = factory - self.idx = -1 - self.closed = 0 - - def getIndex(self): - """Get the channel index. - """ - return self.idx - - def notificationReceived(self): - """Called when a notification is received. - Calls handleNotification(), which should be defined - in a subclass. - """ - if self.closed: return - self.handleNotification() - - def close(self): - """Close the channel. Calls channelClosed() on the factory. - Override in subclass. - """ - self.factory.channelClosed(self) - - def handleNotification(self): - """Handle notification. - Define in subclass. - """ - pass - - -class VirqChannel(BaseChannel): - """A channel for handling a virq. - """ - - def __init__(self, factory, virq): - """Create a channel for the given virq using the given factory. +class Channel: - Do not call directly, use virqChannel on the factory. - """ - BaseChannel.__init__(self, factory) - self.virq = virq + def __init__(self, factory, dom, local_port, remote_port): self.factory = factory - # Notification port (int). - #self.port = xc.evtchn_bind_virq(virq) - self.port = factory.notifier.bind_virq(virq) - self.idx = self.port - # Clients to call when a virq arrives. - self.clients = [] - - def __repr__(self): - return ('' - % (self.virq, self.port)) + self.dom = int(dom) + # Registered devices. + self.devs = [] + # Devices indexed by the message types they handle. + self.devs_by_type = {} + self.port = self.factory.createPort(self.dom, + local_port=local_port, + remote_port=remote_port) + self.closed = False + self.queue = ResponseQueue(self) + # Make sure the port will deliver all the messages. + self.port.register(TYPE_WILDCARD) - def getVirq(self): - """Get the channel's virq. + def getKey(self): + """Get the channel key. """ - return self.virq + return (self.dom, self.getLocalPort(), self.getRemotePort()) + + def sxpr(self): + val = ['channel'] + val.append(['domain', self.dom]) + if self.port: + val.append(['local_port', self.port.local_port]) + val.append(['remote_port', self.port.remote_port]) + return val def close(self): - """Close the channel. Calls lostChannel(self) on all its clients and - channelClosed() on the factory. - """ - for c in self.clients[:]: - c.lostChannel(self) - self.clients = [] - BaseChannel.close(self) - - def registerClient(self, client): - """Register a client. The client will be called with - client.virqReceived(virq) when a virq is received. - The client will be called with client.lostChannel(self) if the - channel is closed. + """Close the channel. """ - self.clients.append(client) - - def handleNotification(self): - for c in self.clients: - c.virqReceived(self.virq) - - def notify(self): - # xc.evtchn_send(self.port) - self.factory.notifier.virq_send(self.port) - - -class Channel(BaseChannel): - """A control channel to a domain. Messages for the domain device controllers - are multiplexed over the channel (console, block devs, net devs). - """ - - def __init__(self, factory, dom, local_port=0, remote_port=0): - """Create a channel to the given domain using the given factory. - - Do not call directly, use domChannel on the factory. - """ - BaseChannel.__init__(self, factory) - # Domain. - self.dom = int(dom) - # Domain port (object). - self.port = self.factory.createPort(dom, local_port=local_port, - remote_port=remote_port) - # Channel port (int). - self.idx = self.port.local_port - # Registered devices. + if DEBUG: + print 'Channel>close>', self + if self.closed: return + self.closed = True + self.factory.delChannel(self) + for d in self.devs[:]: + d.lostChannel(self) self.devs = [] - # Devices indexed by the message types they handle. self.devs_by_type = {} - # Output queue. - self.queue = [] - self.closed = 0 + if self.port: + self.port.close() + #self.port = None + + def getDomain(self): + return self.dom def getLocalPort(self): """Get the local port. @@ -282,18 +282,12 @@ class Channel(BaseChannel): if self.closed: return -1 return self.port.remote_port - def close(self): - """Close the channel. Calls lostChannel() on all its devices and - channelClosed() on the factory. - """ - if self.closed: return - self.closed = 1 - for d in self.devs[:]: - d.lostChannel() - self.factory.channelClosed(self) - self.devs = [] - self.devs_by_type = {} - self.port.disconnect() + def __repr__(self): + return ('' + % (self.dom, + self.getLocalPort(), + self.getRemotePort())) + def registerDevice(self, types, dev): """Register a device controller. @@ -306,7 +300,6 @@ class Channel(BaseChannel): self.devs.append(dev) for ty in types: self.devs_by_type[ty] = dev - self.port.register(ty) def deregisterDevice(self, dev): """Remove the registration for a device controller. @@ -318,7 +311,6 @@ class Channel(BaseChannel): types = [ ty for (ty, d) in self.devs_by_type.items() if d == dev ] for ty in types: del self.devs_by_type[ty] - self.port.deregister(ty) def getDevice(self, type): """Get the device controller handling a message type. @@ -330,130 +322,160 @@ class Channel(BaseChannel): """ return self.devs_by_type.get(type) - def getMessageType(self, msg): - """Get a 2-tuple of the message type and subtype. - - @param msg: message - @type msg: xu message - @return: type info - @rtype: (int, int) - """ - hdr = msg.get_header() - return (hdr['type'], hdr.get('subtype')) - - def __repr__(self): - return ('' - % (self.dom, - self.getLocalPort(), - self.getRemotePort())) - - def handleNotification(self): - """Process outstanding messages in repsonse to notification on the port. - """ - if self.closed: - print 'handleNotification> Notification on closed channel', self - return - work = 0 - work += self.handleRequests() - work += self.handleResponses() - work += self.handleWrites() - if work: - self.notify() - - def notify(self): - """Notify the other end of the port that messages have been processed. - """ - if self.closed: return - self.port.notify() - - def handleRequests(self): - work = 0 - while 1: - msg = self.readRequest() - if not msg: break - self.requestReceived(msg) - work += 1 - return work - def requestReceived(self, msg): - (ty, subty) = self.getMessageType(msg) - #todo: Must respond before writing any more messages. - #todo: Should automate this (respond on write) - responded = 0 + if DEBUG: + print 'Channel>requestReceived>', self, + printMsg(msg) + (ty, subty) = getMessageType(msg) + responded = False dev = self.getDevice(ty) if dev: responded = dev.requestReceived(msg, ty, subty) + elif DEBUG: + print "Channel>requestReceived> No device", self, + printMsg(msg) else: - print ("requestReceived> No device: Message type %s %d:%d" - % (msgTypeName(ty, subty), ty, subty)), self + pass if not responded: - self.port.write_response(msg) - - def handleResponses(self): - work = 0 - while 1: - msg = self.readResponse() - if not msg: break - self.responseReceived(msg) - work += 1 - return work - - def responseReceived(self, msg): - (ty, subty) = self.getMessageType(msg) - dev = self.getDevice(ty) - if dev: - dev.responseReceived(msg, ty, subty) - else: - print ("responseReceived> No device: Message type %d:%d" - % (msgTypeName(ty, subty), ty, subty)), self - - def handleWrites(self): - work = 0 - # Pull data from producers. - for dev in self.devs: - work += dev.produceRequests() - # Flush the queue. - while self.queue and self.port.space_to_write_request(): - msg = self.queue.pop(0) - self.port.write_request(msg) - work += 1 - return work - - def writeRequest(self, msg, notify=1): - if self.closed: - val = -1 - elif self.writeReady(): - self.port.write_request(msg) - if notify: self.notify() - val = 1 - else: - self.queue.append(msg) - val = 0 - return val + self.writeResponse(msg) - def writeResponse(self, msg): + def writeRequest(self, msg): + if DEBUG: + print 'Channel>writeRequest>', self, + printMsg(msg, all=True) if self.closed: return -1 - self.port.write_response(msg) + self.port.write_request(msg) return 1 - def writeReady(self): - if self.closed or self.queue: return 0 - return self.port.space_to_write_request() + def writeResponse(self, msg): + if DEBUG: + print 'Channel>writeResponse>', self, + printMsg(msg, all=True) + if self.port: + self.port.write_response(msg) + return 1 def readRequest(self): if self.closed: - return None - if self.port.request_to_read(): - val = self.port.read_request() + val = None else: - val = None + val = self.port.read_request() return val def readResponse(self): if self.closed: - return None - if self.port.response_to_read(): - val = self.port.read_response() - else: val = None + else: + val = self.port.read_response() + if DEBUG and val: + print 'Channel>readResponse>', self, + printMsg(val, all=True) return val + + def requestResponse(self, msg, timeout=None): + """Write a request and wait for a response. + Raises IOError on timeout. + + @param msg request message + @param timeout timeout (0 is forever) + @return response message + """ + if self.closed: + raise IOError("closed") + if self.closed: + return None + if timeout is None: + timeout = RESPONSE_TIMEOUT + elif timeout <= 0: + timeout = None + return self.queue.call(msg, timeout) + + def responseReceived(self, msg): + if DEBUG: + print 'Channel>responseReceived>', self, + printMsg(msg) + self.queue.response(getMessageId(msg), msg) + + def virq(self): + self.factory.virq() + + +class Response: + """Entry in the response queue. + Used to signal a response to a message. + """ + + def __init__(self, mid): + self.mid = mid + self.msg = None + self.ready = threading.Event() + + def response(self, msg): + """Signal arrival of a response to a waiting thread. + Passing msg None cancels the wait with an IOError. + """ + if msg: + self.msg = msg + else: + self.mid = -1 + self.ready.set() + + def wait(self, timeout): + """Wait up to 'timeout' seconds for a response. + Returns the response or raises an IOError. + """ + self.ready.wait(timeout) + if self.mid < 0: + raise IOError("wait canceled") + if self.msg is None: + raise IOError("response timeout") + return self.msg + +class ResponseQueue: + """Response queue. Manages waiters for responses to messages. + """ + + def __init__(self, channel): + self.channel = channel + self.lock = threading.Lock() + self.responses = {} + + def add(self, mid): + r = Response(mid) + self.responses[mid] = r + return r + + def get(self, mid): + return self.responses.get(mid) + + def remove(self, mid): + r = self.responses.get(mid) + if r: + del self.responses[mid] + return r + + def response(self, mid, msg): + """Process a response. + """ + try: + self.lock.acquire() + r = self.remove(mid) + finally: + self.lock.release() + if r: + r.response(msg) + + def call(self, msg, timeout): + """Send the message and wait for 'timeout' seconds for a response. + Returns the response. + Raises IOError on timeout. + """ + mid = getMessageId(msg) + try: + self.lock.acquire() + r = self.add(mid) + finally: + self.lock.release() + self.channel.writeRequest(msg) + return r.wait(timeout) + diff --git a/tools/python/xen/xend/server/console.py b/tools/python/xen/xend/server/console.py index efe85acccc..bf4d142593 100755 --- a/tools/python/xen/xend/server/console.py +++ b/tools/python/xen/xend/server/console.py @@ -2,19 +2,17 @@ import socket -from twisted.internet import reactor -from twisted.internet import protocol +from twisted.internet import reactor, protocol from xen.lowlevel import xu from xen.xend.XendError import XendError -from xen.xend import EventServer -eserver = EventServer.instance() +from xen.xend import EventServer; eserver = EventServer.instance() from xen.xend.XendLogging import log -from xen.xend import XendRoot -xroot = XendRoot.instance() +from xen.xend import XendRoot; xroot = XendRoot.instance() +from xen.xend import sxp -import controller +from controller import CtrlMsgRcvr, Dev, DevController from messages import * from params import * @@ -22,31 +20,28 @@ class ConsoleProtocol(protocol.Protocol): """Asynchronous handler for a console TCP socket. """ - def __init__(self, controller, idx): - self.controller = controller - self.idx = idx + def __init__(self, console, id): + self.console = console + self.id = id self.addr = None self.binary = 0 def connectionMade(self): peer = self.transport.getPeer() self.addr = (peer.host, peer.port) - if self.controller.connect(self.addr, self): + if self.console.connect(self.addr, self): self.transport.write("Cannot connect to console %d on domain %d\n" - % (self.idx, self.controller.dom)) + % (self.id, self.console.dom)) self.loseConnection() return else: - # KAF: A nice quiet successful connect. - #self.transport.write("Connected to console %d on domain %d\n" - # % (self.idx, self.controller.dom)) log.info("Console connected %s %s %s", - self.idx, str(self.addr[0]), str(self.addr[1])) + self.id, str(self.addr[0]), str(self.addr[1])) eserver.inject('xend.console.connect', - [self.idx, self.addr[0], self.addr[1]]) + [self.id, self.addr[0], self.addr[1]]) def dataReceived(self, data): - if self.controller.handleInput(self, data): + if self.console.receiveInput(self, data): self.loseConnection() def write(self, data): @@ -55,10 +50,10 @@ class ConsoleProtocol(protocol.Protocol): def connectionLost(self, reason=None): log.info("Console disconnected %s %s %s", - self.idx, str(self.addr[0]), str(self.addr[1])) + str(self.id), str(self.addr[0]), str(self.addr[1])) eserver.inject('xend.console.disconnect', - [self.idx, self.addr[0], self.addr[1]]) - self.controller.disconnect(conn=self) + [self.id, self.addr[0], self.addr[1]]) + self.console.disconnect(conn=self) def loseConnection(self): self.transport.loseConnection() @@ -68,42 +63,19 @@ class ConsoleFactory(protocol.ServerFactory): """ protocol = ConsoleProtocol - def __init__(self, controller, idx): + def __init__(self, console, id): #protocol.ServerFactory.__init__(self) - self.controller = controller - self.idx = idx + self.console = console + self.id = id def buildProtocol(self, addr): - proto = self.protocol(self.controller, self.idx) + proto = self.protocol(self.console, self.id) proto.factory = self return proto -class ConsoleControllerFactory(controller.ControllerFactory): - """Factory for creating console controllers. - """ - - def createController(self, dom, console_port=None): - if console_port is None: - console_port = xroot.get_console_port_base() + dom - for c in self.getControllers(): - if c.console_port == console_port: - raise XendError('console port in use: ' + str(console_port)) - console = ConsoleController(self, dom, console_port) - self.addController(console) - log.info("Created console id=%s domain=%d port=%d", - console.idx, console.dom, console.console_port) - eserver.inject('xend.console.create', - [console.idx, console.dom, console.console_port]) - return console - - def consoleClosed(self, console): - log.info("Closed console id=%s", console.idx) - eserver.inject('xend.console.close', console.idx) - self.delController(console) - -class ConsoleController(controller.Controller): - """Console controller for a domain. - Does not poll for i/o itself, but relies on the notifier to post console +class ConsoleDev(Dev): + """Console device for a domain. + Does not poll for i/o itself, but relies on the domain to post console output and the connected TCP sockets to post console input. """ @@ -112,28 +84,53 @@ class ConsoleController(controller.Controller): STATUS_CONNECTED = 'connected' STATUS_LISTENING = 'listening' - def __init__(self, factory, dom, console_port): - controller.Controller.__init__(self, factory, dom) - self.addMethod(CMSG_CONSOLE, 0, None) + def __init__(self, controller, id, config, recreate=False): + print 'Console>' + Dev.__init__(self, controller, id, config) self.status = self.STATUS_NEW self.addr = None self.conn = None - self.rbuf = xu.buffer() - self.wbuf = xu.buffer() + self.console_port = None + self.obuf = xu.buffer() + self.ibuf = xu.buffer() + self.channel = None + self.listener = None + + console_port = sxp.child_value(self.config, "console_port") + if console_port is None: + console_port = xroot.get_console_port_base() + self.getDomain() + self.checkConsolePort(console_port) self.console_port = console_port + + log.info("Created console id=%d domain=%d port=%d", + self.id, self.getDomain(), self.console_port) + eserver.inject('xend.console.create', + [self.id, self.getDomain(), self.console_port]) - self.registerChannel() - self.listener = None + def init(self, recreate=False, reboot=False): + print 'Console>init>' + self.destroyed = False + self.channel = self.getChannel() self.listen() + def checkConsolePort(self, console_port): + """Check that a console port is not in use by another console. + """ + xd = XendRoot.get_component('xen.xend.XendDomain') + for vm in xd.domains(): + ctrl = vm.getDeviceController(self.getType(), error=False) + if (not ctrl): continue + ctrl.checkConsolePort(console_port) + def sxpr(self): val = ['console', ['status', self.status ], - ['id', self.idx ], - ['domain', self.dom ] ] + ['id', self.id ], + ['domain', self.getDomain() ] ] val.append(['local_port', self.getLocalPort() ]) val.append(['remote_port', self.getRemotePort() ]) val.append(['console_port', self.console_port ]) + val.append(['index', self.getIndex()]) if self.addr: val.append(['connected', self.addr[0], self.addr[1]]) return val @@ -159,42 +156,33 @@ class ConsoleController(controller.Controller): host = socket.gethostname() return "telnet://%s:%d" % (host, self.console_port) - def ready(self): - return not (self.closed() or self.rbuf.empty()) - def closed(self): return self.status == self.STATUS_CLOSED def connected(self): return self.status == self.STATUS_CONNECTED - def close(self): - """Close the console controller. - """ - self.lostChannel() - - def lostChannel(self): - """The channel to the domain has been lost. - Cleanup: disconnect TCP connections and listeners, notify the controller. + def destroy(self, change=False, reboot=False): + """Close the console. """ + if reboot: + return self.status = self.STATUS_CLOSED if self.conn: self.conn.loseConnection() self.listener.stopListening() - controller.Controller.lostChannel(self) def listen(self): """Listen for TCP connections to the console port.. """ if self.closed(): return - self.status = self.STATUS_LISTENING if self.listener: - #self.listener.startListening() pass else: - f = ConsoleFactory(self, self.idx) + self.status = self.STATUS_LISTENING + cf = ConsoleFactory(self, self.id) interface = xroot.get_console_address() - self.listener = reactor.listenTCP(self.console_port, f, interface=interface) + self.listener = reactor.listenTCP(self.console_port, cf, interface=interface) def connect(self, addr, conn): """Connect a TCP connection to the console. @@ -210,7 +198,7 @@ class ConsoleController(controller.Controller): self.addr = addr self.conn = conn self.status = self.STATUS_CONNECTED - self.handleOutput() + self.writeOutput() return 0 def disconnect(self, conn=None): @@ -221,68 +209,107 @@ class ConsoleController(controller.Controller): self.conn.loseConnection() self.addr = None self.conn = None + self.status = self.STATUS_LISTENING self.listen() - def requestReceived(self, msg, type, subtype): - """Receive console data from the console channel. + def receiveOutput(self, msg): + """Receive output console data from the console channel. msg console message type major message type subtype minor message typ """ - self.rbuf.write(msg.get_payload()) - self.handleOutput() + # Treat the obuf as a ring buffer. + data = msg.get_payload() + data_n = len(data) + if self.obuf.space() < data_n: + self.obuf.discard(data_n) + if self.obuf.space() < data_n: + data = data[-self.obuf.space():] + self.obuf.write(data) + self.writeOutput() - def responseReceived(self, msg, type, subtype): - """Handle a response to a request written to the console channel. - Just ignore it because the return values are not interesting. + def writeOutput(self): + """Handle buffered output from the console device. + Sends it to the connected TCP connection (if any). + """ + if self.closed(): + return -1 + if not self.conn: + return 0 + while not self.obuf.empty(): + try: + bytes = self.conn.write(self.obuf.peek()) + if bytes > 0: + self.obuf.discard(bytes) + except socket.error: + pass + return 0 + + def receiveInput(self, conn, data): + """Receive console input from a TCP connection. Ignores the + input if the calling connection (conn) is not the one + connected to the console (self.conn). - msg console message - type major message type - subtype minor message typ + conn connection + data input data """ - pass + if self.closed(): return -1 + if conn != self.conn: return 0 + self.ibuf.write(data) + self.writeInput() + return 0 - def produceRequests(self): - """Write pending console data to the console channel. + def writeInput(self): + """Write pending console input to the console channel. Writes as much to the channel as it can. """ - work = 0 - while self.channel and not self.wbuf.empty() and self.channel.writeReady(): + while self.channel and not self.ibuf.empty(): msg = xu.message(CMSG_CONSOLE, 0, 0) - msg.append_payload(self.wbuf.read(msg.MAX_PAYLOAD)) - work += self.channel.writeRequest(msg, notify=0) - return work + msg.append_payload(self.ibuf.read(msg.MAX_PAYLOAD)) + self.channel.writeRequest(msg) - def handleInput(self, conn, data): - """Handle some external input aimed at the console. - Called from a TCP connection (conn). Ignores the input - if the calling connection (conn) is not the one connected - to the console (self.conn). +class ConsoleController(DevController): + """Device controller for all the consoles for a domain. + """ - conn connection - data input data + def __init__(self, dctype, vm, recreate=False): + DevController.__init__(self, dctype, vm, recreate=recreate) + self.rcvr = None + + def initController(self, recreate=False, reboot=False): + self.destroyed = False + self.rcvr = CtrlMsgRcvr(self.getChannel()) + self.rcvr.addHandler(CMSG_CONSOLE, + 0, + self.receiveOutput) + self.rcvr.registerChannel() + if reboot: + self.rebootDevices() + + def destroyController(self, reboot=False): + self.destroyed = True + self.destroyDevices(reboot=reboot) + self.rcvr.deregisterChannel() + + def newDevice(self, id, config, recreate=False): + return ConsoleDev(self, id, config, recreate=recreate) + + def checkConsolePort(self, console_port): + """Check that a console port is not in use by a console. """ - if self.closed(): return -1 - if conn != self.conn: return 0 - self.wbuf.write(data) - if self.channel and self.produceRequests(): - self.channel.notify() - return 0 + for c in self.getDevices(): + if c.console_port == console_port: + raise XendError('console port in use: ' + str(console_port)) - def handleOutput(self): - """Handle buffered output from the console. - Sends it to the connected console (if any). + def receiveOutput(self, msg): + """Handle a control request. + The CMSG_CONSOLE messages just contain data, and no console id, + so just send to console 0 (if there is one). + + todo: extend CMSG_CONSOLE to support more than one console? """ - if self.closed(): - return -1 - if not self.conn: - return 0 - while not self.rbuf.empty(): - try: - bytes = self.conn.write(self.rbuf.peek()) - if bytes > 0: - self.rbuf.discard(bytes) - except socket.error, error: - pass - return 0 + console = self.getDevice(0) + if console: + console.receiveOutput(msg) + diff --git a/tools/python/xen/xend/server/controller.py b/tools/python/xen/xend/server/controller.py index c8962d4675..43a500f539 100755 --- a/tools/python/xen/xend/server/controller.py +++ b/tools/python/xen/xend/server/controller.py @@ -3,84 +3,29 @@ for a domain. """ -from twisted.internet import defer -#defer.Deferred.debug = 1 - -import channel -from messages import msgTypeName, printMsg +from xen.xend.XendError import XendError +from messages import msgTypeName, printMsg, getMessageType DEBUG = 0 -class Responder: - """Handler for a response to a message with a specified id. - """ - - def __init__(self, mid, deferred): - """Create a responder. - - @param mid: message id of response to handle - @type mid: int - @param deferred: deferred object holding the callbacks - @type deferred: Deferred - """ - self.mid = mid - self.deferred = deferred - - def responseReceived(self, msg): - """Entry point called when a response message with the right id arrives. - Calls callback on I{self.deferred} with the message. - - @param msg: response message - @type msg: xu message - """ - if self.deferred.called: return - self.deferred.callback(msg) - - def error(self, err): - """Entry point called when there has been an error. - Calls errback on I{self.deferred} with the error. - - @param err: error - @type err: Exception - """ - if self.deferred.called: return - self.deferred.errback(err) - class CtrlMsgRcvr: - """Abstract class for things that deal with a control interface to a domain. + """Dispatcher class for messages on a control channel. Once I{registerChannel} has been called, our message types are registered - with the channel to the domain. The channel will call I{requestReceived} - when a request arrives, or I{responseReceived} when a response arrives, - if they have one of our message types. + with the channel. The channel will call I{requestReceived} + when a request arrives if it has one of our message types. - @ivar dom: the domain we are a control interface for - @type dom: int + @ivar channel: channel to a domain + @type channel: Channel @ivar majorTypes: major message types we are interested in @type majorTypes: {int:{int:method}} - @ivar timeout: timeout (in seconds) for message handlers - @type timeout: int - @ivar channel: channel to the domain - @type channel: Channel - @ivar idx: channel index - @ivar idx: string - @ivar responders: table of message response handlers - @type responders: {int:Responder} """ - def __init__(self): - self.channelFactory = channel.channelFactory() + def __init__(self, channel): self.majorTypes = {} - self.dom = None - self.channel = None - self.idx = None - self.responders = {} - self.timeout = 10 - - def setTimeout(self, timeout): - self.timeout = timeout + self.channel = channel - def getMethod(self, type, subtype): + def getHandler(self, type, subtype): """Get the method for a type and subtype. @param type: major message type @@ -93,7 +38,7 @@ class CtrlMsgRcvr: method = subtypes.get(subtype) return method - def addMethod(self, type, subtype, method): + def addHandler(self, type, subtype, method): """Add a method to handle a message type and subtype. @param type: major message type @@ -124,102 +69,31 @@ class CtrlMsgRcvr: """ if DEBUG: print 'requestReceived>', - printMsg(msg, all=1) + printMsg(msg, all=True) responded = 0 - method = self.getMethod(type, subtype) + method = self.getHandler(type, subtype) if method: - responded = method(msg, 1) + responded = method(msg) elif DEBUG: print ('requestReceived> No handler: Message type %s %d:%d' % (msgTypeName(type, subtype), type, subtype)), self return responded - def responseReceived(self, msg, type, subtype): - """Dispatch a response to handlers. - Called by the channel for responses with one of our types. - - First looks for a message responder for the message's id. - See L{callResponders}, L{addResponder}. - If there is no responder, looks for a message handler for - the message type/subtype. - - @param msg: message - @type msg: xu message - @param type: major message type - @type type: int - @param subtype: minor message type - @type subtype: int - """ - if DEBUG: - print 'responseReceived>', - printMsg(msg, all=1) - if self.callResponders(msg): - return - method = self.getMethod(type, subtype) - if method: - method(msg, 0) - elif DEBUG: - print ('responseReceived> No handler: Message type %s %d:%d' - % (msgTypeName(type, subtype), type, subtype)), self - - def addResponder(self, mid, deferred): - """Add a responder for a message id. - The I{deferred} is called with callback(msg) when a response - with message id I{mid} arrives. - - Responders have a timeout set and I{deferred} will error - on expiry. - - @param mid: message id of response expected - @type mid: int - @param deferred: handler for the response - @type deferred: Deferred - @return: responder - @rtype: Responder - """ - resp = Responder(mid, deferred) - self.responders[resp.mid] = resp - if self.timeout > 0: - deferred.setTimeout(self.timeout) - return resp - - def callResponders(self, msg): - """Call any waiting responders for a response message. - Looks for a responder registered for the message's id. - See L{addResponder}. - - @param msg: response message - @type msg: xu message - @return: 1 if there was a responder for the message, 0 otherwise - @rtype : bool - """ - hdr = msg.get_header() - mid = hdr['id'] - handled = 0 - resp = self.responders.get(mid) - if resp: - handled = 1 - resp.responseReceived(msg) - del self.responders[mid] - # Clean up called responders. - for resp in self.responders.values(): - if resp.deferred.called: - del self.responders[resp.mid] - return handled def lostChannel(self): """Called when the channel to the domain is lost. """ - pass + print 'CtrlMsgRcvr>lostChannel>', + self.channel = None def registerChannel(self): """Register interest in our major message types with the channel to our domain. Once we have registered, the channel - will call requestReceived or responseReceived for our messages. + will call requestReceived for our messages. """ - self.channel = self.channelFactory.domChannel(self.dom) - self.idx = self.channel.getIndex() - if self.majorTypes: + if DEBUG: + print 'CtrlMsgRcvr>registerChannel>', self.channel, self.getMajorTypes() + if self.channel: self.channel.registerDevice(self.getMajorTypes(), self) def deregisterChannel(self): @@ -229,470 +103,340 @@ class CtrlMsgRcvr: """ if self.channel: self.channel.deregisterDevice(self) - self.channel = None - - def produceRequests(self): - """Produce any queued requests. - @return: number produced - @rtype: int - """ - return 0 - - def writeRequest(self, msg, response=None): - """Write a request to the channel. - - @param msg: request message - @type msg: xu message - @param response: response handler - @type response: Deferred - """ - if self.channel: - if DEBUG: - print 'CtrlMsgRcvr>writeRequest>', - printMsg(msg, all=1) - if response: - self.addResponder(msg.get_header()['id'], response) - self.channel.writeRequest(msg) - else: - print 'CtrlMsgRcvr>writeRequest>', 'no channel!', self - - def writeResponse(self, msg): - """Write a response to the channel. This acknowledges - a request message. - - @param msg: message - @type msg: xu message - """ - if self.channel: - if DEBUG: - print 'CtrlMsgRcvr>writeResponse>', - printMsg(msg, all=0) - self.channel.writeResponse(msg) - else: - print 'CtrlMsgRcvr>writeResponse>', 'no channel!', self - -class ControllerFactory: - """Abstract class for factories creating controllers for a domain. - Maintains a table of controllers. - - @ivar controllers: mapping of index to controller instance - @type controllers: {String: Controller} - @ivar dom: domain - @type dom: int +class DevControllerType: + """Abstract class for device controller types. """ - def __init__(self): - self.controllers = {} - - def addController(self, controller): - """Add a controller instance (under its index). - """ - self.controllers[controller.idx] = controller - - def getControllers(self): - """Get a list of all controllers. - """ - return self.controllers.values() + def __init__(self, type): + self.type = type - def getControllerByIndex(self, idx): - """Get a controller from its index. + def getType(self): + """Get the device controller type name. """ - return self.controllers.get(idx) - - def getControllerByDom(self, dom): - """Get the controller for the given domain. - - @param dom: domain id - @type dom: int - @return: controller or None - """ - for inst in self.controllers.values(): - if inst.dom == dom: - return inst - return None - - def getController(self, dom): - """Create or find the controller for a domain. + return self.type - @param dom: domain - @return: controller - """ - ctrl = self.getControllerByDom(dom) - if ctrl is None: - ctrl = self.createController(dom) - self.addController(ctrl) - return ctrl - - def createController(self, dom): - """Create a controller. Define in a subclass. - - @param dom: domain - @type dom: int - @return: controller instance - @rtype: Controller (or subclass) + def createDevController(self, vm, recreate=False): + """Create a device controller for a domain. + Must be implemented in subclass. """ raise NotImplementedError() - def delController(self, controller): - """Delete a controller instance from the table. - - @param controller: controller instance - """ - if controller.idx in self.controllers: - del self.controllers[controller.idx] - - def controllerClosed(self, controller): - """Callback called when a controller is closed (usually by the controller). - - @param controller: controller instance - """ - self.delController(controller) - -class Controller(CtrlMsgRcvr): - """Abstract class for a device controller attached to a domain. - - @ivar factory: controller factory - @type factory: ControllerFactory - @ivar dom: domain - @type dom: int - @ivar channel: channel to the domain - @type channel: Channel - @ivar idx: channel index - @type idx: String +class SimpleDevControllerType(DevControllerType): + """Device controller type that simply wraps a controller + class and uses its constructor to create instances. """ + + def __init__(self, type, devControllerClass): + DevControllerType.__init__(self, type) + self.devControllerClass = devControllerClass - def __init__(self, factory, dom): - CtrlMsgRcvr.__init__(self) - self.factory = factory - self.dom = int(dom) - self.channel = None - self.idx = None - - def close(self): - """Close the controller. - """ - self.lostChannel() - - def lostChannel(self): - """The controller channel has been lost. + def createDevController(self, vm, recreate=False): + """Create a device controller for a domain. """ - self.deregisterChannel() - self.factory.controllerClosed(self) + ctrl = self.devControllerClass(self, vm, recreate=recreate) + ctrl.initController(recreate=recreate) + return ctrl -class SplitControllerFactory(ControllerFactory): - """Abstract class for factories creating split controllers for a domain. - Maintains a table of backend controllers. +class DevControllerTable: + """Table of device controller types, indexed by type name. """ def __init__(self): - ControllerFactory.__init__(self) - self.backendControllers = {} - - def getBackendControllers(self): - return self.backendControllers.values() - - def getBackendControllerByDomain(self, dom): - """Get the backend controller for a domain if there is one. - - @param dom: backend domain - @return: backend controller - """ - return self.backendControllers.get(dom) - - def getBackendController(self, dom): - """Get the backend controller for a domain, creating - if necessary. - - @param dom: backend domain - @return: backend controller - """ - b = self.getBackendControllerByDomain(dom) - if b is None: - b = self.createBackendController(dom) - self.backendControllers[b.dom] = b - return b + self.controllerTypes = {} + + def getDevControllerType(self, type): + return self.controllerTypes.get(type) + + def addDevControllerType(self, dctype): + self.controllerTypes[dctype.getType()] = dctype + return dctype + + def delDevControllerType(self, type): + if type in self.controllerTypes: + del self.controllerTypes[type] + + def createDevController(self, type, vm, recreate=False): + dctype = self.getDevControllerType(type) + if not dctype: + raise XendError("unknown device type: " + type) + return dctype.createDevController(vm, recreate=recreate) + +def getDevControllerTable(): + global devControllerTable + try: + devControllerTable + except: + devControllerTable = DevControllerTable() + return devControllerTable + +def addDevControllerType(dctype): + return getDevControllerTable().addDevControllerType(dctype) + +def addDevControllerClass(name, klass): + ty = SimpleDevControllerType(name, klass) + return addDevControllerType(ty) - def createBackendController(self, dom): - """Create a backend controller. Define in a subclass. +def createDevController(name, vm, recreate=False): + return getDevControllerTable().createDevController(name, vm, recreate=recreate) - @param dom: backend domain - @return: backend controller - """ - raise NotImplementedError() +class DevController: + """Abstract class for a device controller attached to a domain. + A device controller manages all the devices of a given type for a domain. + There is exactly one device controller for each device type for + a domain. - def delBackendController(self, ctrlr): - """Remove a backend controller. + """ - @param ctrlr: backend controller - """ - if ctrlr.dom in self.backendControllers: - del self.backendControllers[ctrlr.dom] + def __init__(self, dctype, vm, recreate=False): + self.dctype = dctype + self.destroyed = False + self.vm = vm + self.deviceId = 0 + self.devices = {} + self.device_order = [] - def backendControllerClosed(self, ctrlr): - """Callback called when a backend is closed. - """ - self.delBackendController(ctrlr) - - def createBackendInterface(self, ctrl, dom, handle): - """Create a backend interface. Define in a subclass. + def getType(self): + return self.dctype.getType() - @param ctrl: frontend controller - @param dom: backend domain - @return: backend interface - """ - raise NotImplementedError() + def getDevControllerType(self): + return self.dctype -class BackendController(Controller): - """Abstract class for a backend device controller attached to a domain. + def getDomain(self): + return self.vm.getDomain() - @ivar factory: backend controller factory - @type factory: BackendControllerFactory - @ivar dom: backend domain - @type dom: int - @ivar channel: channel to the domain - @type channel: Channel - """ + def getDomainName(self): + return self.vm.getName() + def getChannel(self): + chan = self.vm.getChannel() + return chan - def __init__(self, factory, dom): - CtrlMsgRcvr.__init__(self) - self.factory = factory - self.dom = int(dom) - self.channel = None - self.backendInterfaces = {} - - def close(self): - self.lostChannel() - - def lostChannel(self): - self.deregisterChannel() - self.backend.backendClosed(self) + def getDomainInfo(self): + return self.vm - def registerInterface(self, intf): - key = intf.getInterfaceKey() - self.backendInterfaces[key] = intf + #---------------------------------------------------------------------------- + # Subclass interface. + # Subclasses should define the unimplemented methods.. + # Redefinitions must have the same arguments. - def deregisterInterface(self, intf): - key = intf.getInterfaceKey() - if key in self.backendInterfaces: - del self.backendInterfaces[key] + def initController(self, recreate=False, reboot=False): + self.destroyed = False + if reboot: + self.rebootDevices() - def getInterface(self, dom, handle): - key = (dom, handle) - return self.backendInterfaces.get(key) + def newDevice(self, id, config, recreate=False): + """Create a device with the given config. + Must be defined in subclass. - - def createBackendInterface(self, ctrl, dom, handle): - """Create a backend interface. Define in a subclass. - - @param ctrl: controller - @param dom: backend domain - @param handle: backend handle + @return device """ raise NotImplementedError() + def createDevice(self, config, recreate=False, change=False): + print 'DevController>createDevice>', 'config=', config, 'recreate=', recreate, 'change=', change + dev = self.newDevice(self.nextDeviceId(), config, recreate=recreate) + dev.init(recreate=recreate) + self.addDevice(dev) + idx = self.getDeviceIndex(dev) + recreate = self.vm.get_device_recreate(self.getType(), idx) + dev.attach(recreate=recreate, change=change) + print 'DevController>createDevice<' + + def configureDevice(self, id, config, change=False): + """Reconfigure an existing device. + May be defined in subclass.""" + dev = self.getDevice(id) + if not dev: + raise XendError("invalid device id: " + id) + dev.configure(config, change=change) + + def destroyDevice(self, id, change=False, reboot=False): + """Destroy a device. + May be defined in subclass.""" + dev = self.getDevice(id) + if not dev: + raise XendError("invalid device id: " + id) + dev.destroy(change=change, reboot=reboot) + return dev + + def deleteDevice(self, id, change=True): + dev = self.destroyDevice(id, change=change) + self.removeDevice(dev) + + def destroyController(self, reboot=False): + """Destroy all devices and clean up. + May be defined in subclass.""" + self.destroyed = True + self.destroyDevices(reboot=reboot) + + #---------------------------------------------------------------------------- -class BackendInterface: - """Abstract class for a domain's interface onto a backend controller. - """ - - def __init__(self, controller, dom, handle): - """ + def isDestroyed(self): + return self.destroyed - @param controller: front-end controller - @param dom: back-end domain - @param handle: back-end interface handle - """ - self.factory = controller.factory - self.controller = controller - self.dom = int(dom) - self.handle = handle - self.backend = self.getBackendController() - - def registerInterface(self): - self.backend.registerInterface(self) + def getDevice(self, id): + return self.devices.get(id) - def getInterfaceKey(self): - return (self.controller.dom, self.handle) - - def getBackendController(self): - return self.factory.getBackendController(self.dom) + def getDeviceByIndex(self, idx): + if 0 <= idx < len(self.device_order): + return self.device_order[idx] + else: + return None - def writeRequest(self, msg, response=None): - return self.backend.writeRequest(msg, response=response) + def getDeviceIndex(self, dev): + return self.device_order.index(dev) - def writeResponse(self, msg): - return self.backend.writeResponse(msg) - - def close(self): - self.backend.deregisterInterface(self) - self.controller.backendInterfaceClosed(self) - -class SplitController(Controller): - """Abstract class for a device controller attached to a domain. - A SplitController manages a BackendInterface for each backend domain - it has at least one device for. - """ - - def __init__(self, factory, dom): - Controller.__init__(self, factory, dom) - self.backendInterfaces = {} - self.backendHandle = 0 - self.devices = {} + def getDeviceIds(self): + return [ dev.getId() for dev in self.device_order ] def getDevices(self): - """Get a list of the devices.. - """ - return self.devices.values() + return self.device_order - def delDevice(self, idx): - """Remove the device with the given index from the device table. + def getDeviceConfig(self, id): + return self.getDevice(id).getConfig() - @param idx device index - """ - if idx in self.devices: - del self.devices[idx] + def getDeviceConfigs(self): + return [ dev.getConfig() for dev in self.device_order ] - def getDevice(self, idx): - """Get the device with a given index. - - @param idx device index - @return device (or None) - """ - return self.devices.get(idx) + def getDeviceSxprs(self): + return [ dev.sxpr() for dev in self.device_order ] - def findDevice(self, idx): - """Find a device. If idx is non-negative, - get the device with the given index. If idx is negative, - look for the device with least index greater than -idx - 2. - For example, if idx is -2, look for devices with index - greater than 0, i.e. 1 or above. + def addDevice(self, dev): + self.devices[dev.getId()] = dev + self.device_order.append(dev) + return dev - @param idx device index - @return device (or None) - """ - if idx < 0: - idx = -idx - 2 - val = None - for dev in self.devices.values(): - if dev.idx <= idx: continue - if (val is None) or (dev.idx < val.idx): - val = dev - else: - val = getDevice(idx) - return val + def removeDevice(self, dev): + if dev.getId() in self.devices: + del self.devices[dev.getId()] + if dev in self.device_order: + self.device_order.remove(dev) - def getMaxDeviceIdx(self): - """Get the maximum id used by devices. + def rebootDevices(self): + print 'DevController>rebootDevices>', self + for dev in self.getDevices(): + dev.reboot() - @return maximum idx + def destroyDevices(self, reboot=False): + """Destroy all devices. """ - maxIdx = 0 - for dev in self.devices: - if dev.idx > maxIdx: - maxIdx = dev.idx - return maxIdx - - def getBackendInterfaces(self): - return self.backendInterfaces.values() + for dev in self.getDevices(): + dev.destroy(reboot=reboot) - def getBackendInterfaceByHandle(self, handle): - for b in self.getBackendInterfaces(): - if b.handle == handle: - return b - return None + def getMaxDeviceId(self): + maxid = 0 + for id in self.devices: + if id > maxid: + maxid = id + return maxid - def getBackendInterfaceByDomain(self, dom): - return self.backendInterfaces.get(dom) + def nextDeviceId(self): + id = self.deviceId + self.deviceId += 1 + return id - def getBackendInterface(self, dom): - """Get the backend interface for a domain. - - @param dom: domain - @return: backend controller - """ - b = self.getBackendInterfaceByDomain(dom) - if b is None: - handle = self.backendHandle - self.backendHandle += 1 - b = self.factory.createBackendInterface(self, dom, handle) - b.registerInterface() - self.backendInterfaces[b.dom] = b - return b - - def delBackendInterface(self, ctrlr): - """Remove a backend controller. - - @param ctrlr: backend controller - """ - if ctrlr.dom in self.backendInterfaces: - del self.backendInterfaces[ctrlr.dom] + def getDeviceCount(self): + return len(self.devices) - def backendInterfaceClosed(self, ctrlr): - """Callback called when a backend is closed. - """ - self.delBackendInterface(ctrlr) - class Dev: """Abstract class for a device attached to a device controller. - @ivar idx: identifier - @type idx: String + @ivar id: identifier + @type id: int @ivar controller: device controller - @type controller: DeviceController - @ivar props: property table - @type props: { String: value } + @type controller: DevController """ - def __init__(self, idx, controller): - self.idx = str(idx) + def __init__(self, controller, id, config, recreate=False): self.controller = controller - self.props = {} + self.id = id + self.config = config + self.destroyed = False - def getidx(self): - return self.idx + def getDomain(self): + return self.controller.getDomain() - def setprop(self, k, v): - self.props[k] = v + def getDomainName(self): + return self.controller.getDomainName() - def getprop(self, k, v=None): - return self.props.get(k, v) + def getChannel(self): + return self.controller.getChannel() + + def getDomainInfo(self): + return self.controller.getDomainInfo() + + def getController(self): + return self.controller - def hasprop(self, k): - return k in self.props + def getType(self): + return self.controller.getType() - def delprop(self, k): - if k in self.props: - del self.props[k] + def getId(self): + return self.id - def sxpr(self): - """Get the s-expression for the deivice. - Implement in a subclass. + def getIndex(self): + return self.controller.getDeviceIndex(self) - @return: sxpr - """ - raise NotImplementedError() + def getConfig(self): + return self.config - def configure(self, config, change=0): - raise NotImplementedError() + def isDestroyed(self): + return self.destroyed -class SplitDev(Dev): + #---------------------------------------------------------------------------- + # Subclass interface. + # Define methods in subclass as needed. + # Redefinitions must have the same arguments. - def __init__(self, idx, controller): - Dev.__init__(self, idx, controller) - self.backendDomain = 0 - self.index = None + def init(self, recreate=False, reboot=False): + """Initialization. Called on initial create (when reboot is False) + and on reboot (when reboot is True). When xend is restarting is + called with recreate True. Define in subclass if needed. + """ + self.destroyed = False - def getBackendInterface(self): - return self.controller.getBackendInterface(self.backendDomain) + def attach(self, recreate=False, change=False): + """Attach the device to its front and back ends. + Define in subclass if needed. + """ + pass - def getIndex(self): - return self.index + def reboot(self): + """Reconnect device when the domain is rebooted. + """ + print 'Dev>reboot>', self + self.init(reboot=True) + self.attach() - def setIndex(self, index): - self.index = index + def sxpr(self): + """Get the s-expression for the deivice. + Implement in a subclass if needed. + @return: sxpr + """ + return self.getConfig() + def configure(self, config, change=False): + """Reconfigure the device. + Implement in subclass. + """ + raise NotImplementedError() + + def refresh(self): + """Refresh the device.. + Default no-op. Define in subclass if needed. + """ + pass + + def destroy(self, change=False, reboot=False): + """Destroy the device. + If change is True notify destruction (runtime change). + If reboot is True the device is being destroyed for a reboot. + Redefine in subclass if needed. + """ + self.destroyed = True + pass + #---------------------------------------------------------------------------- diff --git a/tools/python/xen/xend/server/event.py b/tools/python/xen/xend/server/event.py new file mode 100644 index 0000000000..54fbbde9f0 --- /dev/null +++ b/tools/python/xen/xend/server/event.py @@ -0,0 +1,198 @@ +from twisted.internet import reactor, protocol, defer + +from xen.lowlevel import xu + +from xen.xend import sxp +from xen.xend import PrettyPrint +from xen.xend import EventServer +eserver = EventServer.instance() +from xen.xend.XendError import XendError + +from xen.xend import XendRoot + +DEBUG = 1 + +class EventProtocol(protocol.Protocol): + """Asynchronous handler for a connected event socket. + """ + + def __init__(self, daemon): + #protocol.Protocol.__init__(self) + self.daemon = daemon + # Event queue. + self.queue = [] + # Subscribed events. + self.events = [] + self.parser = sxp.Parser() + self.pretty = 0 + + # For debugging subscribe to everything and make output pretty. + self.subscribe(['*']) + self.pretty = 1 + + def dataReceived(self, data): + try: + self.parser.input(data) + if self.parser.ready(): + val = self.parser.get_val() + res = self.dispatch(val) + self.send_result(res) + if self.parser.at_eof(): + self.loseConnection() + except SystemExit: + raise + except: + if DEBUG: + raise + else: + self.send_error() + + def loseConnection(self): + if self.transport: + self.transport.loseConnection() + if self.connected: + reactor.callLater(0, self.connectionLost) + + def connectionLost(self, reason=None): + self.unsubscribe() + + def send_reply(self, sxpr): + io = StringIO.StringIO() + if self.pretty: + PrettyPrint.prettyprint(sxpr, out=io) + else: + sxp.show(sxpr, out=io) + print >> io + io.seek(0) + return self.transport.write(io.getvalue()) + + def send_result(self, res): + return self.send_reply(['ok', res]) + + def send_error(self): + (extype, exval) = sys.exc_info()[:2] + return self.send_reply(['err', + ['type', str(extype)], + ['value', str(exval)]]) + + def send_event(self, val): + return self.send_reply(['event', val[0], val[1]]) + + def unsubscribe(self): + for event in self.events: + eserver.unsubscribe(event, self.queue_event) + + def subscribe(self, events): + self.unsubscribe() + for event in events: + eserver.subscribe(event, self.queue_event) + self.events = events + + def queue_event(self, name, v): + # Despite the name we don't queue the event here. + # We send it because the transport will queue it. + self.send_event([name, v]) + + def opname(self, name): + return 'op_' + name.replace('.', '_') + + def operror(self, name, req): + raise XendError('Invalid operation: ' +name) + + def dispatch(self, req): + op_name = sxp.name(req) + op_method_name = self.opname(op_name) + op_method = getattr(self, op_method_name, self.operror) + return op_method(op_name, req) + + def op_help(self, name, req): + def nameop(x): + if x.startswith('op_'): + return x[3:].replace('_', '.') + else: + return x + + l = [ nameop(k) for k in dir(self) if k.startswith('op_') ] + return l + + def op_quit(self, name, req): + self.loseConnection() + + def op_exit(self, name, req): + sys.exit(0) + + def op_pretty(self, name, req): + self.pretty = 1 + return ['ok'] + + def op_console_disconnect(self, name, req): + id = sxp.child_value(req, 'id') + if not id: + raise XendError('Missing console id') + id = int(id) + self.daemon.console_disconnect(id) + return ['ok'] + + def op_info(self, name, req): + val = ['info'] + val += self.daemon.consoles() + val += self.daemon.blkifs() + val += self.daemon.netifs() + val += self.daemon.usbifs() + return val + + def op_sys_subscribe(self, name, v): + # (sys.subscribe event*) + # Subscribe to the events: + self.subscribe(v[1:]) + return ['ok'] + + def op_sys_inject(self, name, v): + # (sys.inject event) + event = v[1] + eserver.inject(sxp.name(event), event) + return ['ok'] + + def op_trace(self, name, v): + mode = (v[1] == 'on') + self.daemon.tracing(mode) + + def op_log_stderr(self, name, v): + mode = v[1] + logging = XendRoot.instance().get_logging() + if mode == 'on': + logging.addLogStderr() + else: + logging.removeLogStderr() + + def op_debug_msg(self, name, v): + mode = v[1] + import messages + messages.DEBUG = (mode == 'on') + + def op_debug_controller(self, name, v): + mode = v[1] + import controller + controller.DEBUG = (mode == 'on') + + +class EventFactory(protocol.Factory): + """Asynchronous handler for the event server socket. + """ + protocol = EventProtocol + service = None + + def __init__(self, daemon): + #protocol.Factory.__init__(self) + self.daemon = daemon + + def buildProtocol(self, addr): + proto = self.protocol(self.daemon) + proto.factory = self + return proto + + +def listenEvent(daemon, port, interface): + protocol = EventFactory(daemon) + return reactor.listenTCP(port, protocol, interface=interface) + diff --git a/tools/python/xen/xend/server/messages.py b/tools/python/xen/xend/server/messages.py index cd22d445f5..0cea725e3c 100644 --- a/tools/python/xen/xend/server/messages.py +++ b/tools/python/xen/xend/server/messages.py @@ -4,7 +4,12 @@ import types from xen.lowlevel import xu -DEBUG = 0 +DEBUG = False + +#PORT_WILDCARD = 0xefffffff + +"""Wildcard for the control message types.""" +TYPE_WILDCARD = 0xffff """ All message formats. Added to incrementally for the various message types. @@ -94,7 +99,6 @@ blkif_formats = { (CMSG_BLKIF_FE, CMSG_BLKIF_FE_INTERFACE_STATUS), # Notify device status to fe. # Also used to notify 'any' device change with status BLKIF_INTERFACE_STATUS_CHANGED. - # Rename to blkif_fe_interface_status. 'blkif_fe_driver_status_t': (CMSG_BLKIF_FE, CMSG_BLKIF_FE_DRIVER_STATUS), @@ -102,7 +106,6 @@ blkif_formats = { # Xend sets be(s) to BLKIF_INTERFACE_STATUS_DISCONNECTED, # sends blkif_fe_interface_status_t to fe (from each be). # - # Rename to blkif_fe_driver_status. # Reply with i/f count. # The i/f sends probes (using -ve trick), we reply with the info. @@ -227,24 +230,34 @@ USBIF_BE_STATUS_MAPPING_ERROR = 9 usbif_formats = { 'usbif_be_create_t': (CMSG_USBIF_BE, CMSG_USBIF_BE_CREATE), + 'usbif_be_destroy_t': (CMSG_USBIF_BE, CMSG_USBIF_BE_DESTROY), + 'usbif_be_connect_t': (CMSG_USBIF_BE, CMSG_USBIF_BE_CONNECT), + 'usbif_be_disconnect_t': (CMSG_USBIF_BE, CMSG_USBIF_BE_DISCONNECT), + 'usbif_be_claim_port_t': (CMSG_USBIF_BE, CMSG_USBIF_BE_CLAIM_PORT), + 'usbif_be_release_port_t': (CMSG_USBIF_BE, CMSG_USBIF_BE_RELEASE_PORT), + 'usbif_fe_interface_status_changed_t': (CMSG_USBIF_FE, CMSG_USBIF_FE_INTERFACE_STATUS_CHANGED), + 'usbif_fe_driver_status_changed_t': (CMSG_USBIF_FE, CMSG_USBIF_FE_DRIVER_STATUS_CHANGED), + 'usbif_fe_interface_connect_t': (CMSG_USBIF_FE, CMSG_USBIF_FE_INTERFACE_CONNECT), + 'usbif_fe_interface_disconnect_t': - (CMSG_USBIF_FE, CMSG_USBIF_FE_INTERFACE_DISCONNECT) + (CMSG_USBIF_FE, CMSG_USBIF_FE_INTERFACE_DISCONNECT), + } msg_formats.update(usbif_formats) @@ -364,8 +377,8 @@ def unpackMsg(ty, msg): pass if macs: args['mac'] = mac - print 'macs=', macs - print 'args=', args + #print 'macs=', macs + #print 'args=', args for k in macs: del args[k] if DEBUG: @@ -388,7 +401,7 @@ def msgTypeName(ty, subty): return name return None -def printMsg(msg, out=sys.stdout, all=0): +def printMsg(msg, out=sys.stdout, all=False): """Print a message. @param msg: message @@ -407,3 +420,18 @@ def printMsg(msg, out=sys.stdout, all=0): if all: print >>out, 'payload=', msg.get_payload() + +def getMessageType(msg): + """Get a 2-tuple of the message type and subtype. + + @param msg: message + @type msg: xu message + @return: type info + @rtype: (int, int) + """ + hdr = msg.get_header() + return (hdr['type'], hdr.get('subtype')) + +def getMessageId(msg): + hdr = msg.get_header() + return hdr['id'] diff --git a/tools/python/xen/xend/server/netif.py b/tools/python/xen/xend/server/netif.py index 0a3bdacdbd..18819e8282 100755 --- a/tools/python/xen/xend/server/netif.py +++ b/tools/python/xen/xend/server/netif.py @@ -8,102 +8,44 @@ from twisted.internet import defer from xen.xend import sxp from xen.xend import Vifctl -from xen.xend.XendError import XendError +from xen.xend.XendError import XendError, VmError from xen.xend.XendLogging import log from xen.xend import XendVnet from xen.xend.XendRoot import get_component import channel -import controller +from controller import CtrlMsgRcvr, Dev, DevController from messages import * -class NetifBackendController(controller.BackendController): - """Handler for the 'back-end' channel to a network device driver domain. +class NetDev(Dev): + """A network device. """ - - def __init__(self, ctrl, dom): - controller.BackendController.__init__(self, ctrl, dom) - self.addMethod(CMSG_NETIF_BE, - CMSG_NETIF_BE_DRIVER_STATUS, - self.recv_be_driver_status) - self.registerChannel() - - def recv_be_driver_status(self, msg, req): - val = unpackMsg('netif_be_driver_status_t', msg) - status = val['status'] - -class NetifBackendInterface(controller.BackendInterface): - """Handler for the 'back-end' channel to a network device driver domain - on behalf of a front-end domain. - - Each network device is handled separately, so we add no functionality - here. - """ - - pass - -class NetifControllerFactory(controller.SplitControllerFactory): - """Factory for creating network interface controllers. - """ - - def __init__(self): - controller.SplitControllerFactory.__init__(self) - - def createController(self, dom): - """Create a network interface controller for a domain. - - @param dom: domain - @return: netif controller - """ - return NetifController(self, dom) - - def createBackendController(self, dom): - """Create a network device backend controller. - - @param dom: backend domain - @return: backend controller - """ - return NetifBackendController(self, dom) - - def createBackendInterface(self, ctrl, dom, handle): - """Create a network device backend interface. - - @param ctrl: controller - @param dom: backend domain - @param handle: interface handle - @return: backend interface - """ - return NetifBackendInterface(ctrl, dom, handle) - - def getDomainDevices(self, dom): - """Get the network devices for a domain. - - @param dom: domain - @return: netif controller list - """ - netif = self.getControllerByDom(dom) - return (netif and netif.getDevices()) or [] - def getDomainDevice(self, dom, vif): - """Get a virtual network interface device for a domain. - - @param dom: domain - @param vif: virtual interface index - @return: NetDev - """ - netif = self.getControllerByDom(dom) - return (netif and netif.getDevice(vif)) or None - -class NetDev(controller.SplitDev): - """Info record for a network device. - """ - - def __init__(self, vif, ctrl, config): - controller.SplitDev.__init__(self, vif, ctrl) - self.vif = vif + def __init__(self, controller, id, config, recreate=False): + Dev.__init__(self, controller, id, config, recreate=recreate) + self.vif = int(self.id) self.evtchn = None - self.configure(config) self.status = NETIF_INTERFACE_STATUS_DISCONNECTED + self.frontendDomain = self.getDomain() + self.frontendChannel = None + self.backendDomain = None + self.backendChannel = None + self.credit = None + self.period = None + self.mac = None + self.be_mac = None + self.bridge = None + self.script = None + self.ipaddr = None + self.vifname = None + self.configure(self.config, recreate=recreate) + + def init(self, recreate=False, reboot=False): + self.destroyed = False + self.frontendDomain = self.getDomain() + self.frontendChannel = self.getChannel() + cf = channel.channelFactory() + self.backendChannel = cf.openChannel(self.backendDomain) def _get_config_mac(self, config): vmac = sxp.child_value(config, 'mac') @@ -129,7 +71,7 @@ class NetDev(controller.SplitDev): val = None return val - def configure(self, config, change=0): + def configure(self, config, change=False, recreate=False): if change: return self.reconfigure(config) self.config = config @@ -153,12 +95,18 @@ class NetDev(controller.SplitDev): self.bridge = sxp.child_value(config, 'bridge') self.script = sxp.child_value(config, 'script') self.ipaddr = self._get_config_ipaddr(config) or [] + self._config_credit_limit(config) try: - xd = get_component('xen.xend.XendDomain') - self.backendDomain = int(xd.domain_lookup(sxp.child_value(config, 'backend', '0')).id) + if recreate: + self.backendDomain = int(sxp.child_value(config, 'backend', '0')) + else: + #todo: Code below will fail on xend restart when backend is not domain 0. + xd = get_component('xen.xend.XendDomain') + self.backendDomain = int(xd.domain_lookup(sxp.child_value(config, 'backend', '0')).id) except: raise XendError('invalid backend domain') + return self.config def reconfigure(self, config): """Reconfigure the interface with new values. @@ -178,8 +126,10 @@ class NetDev(controller.SplitDev): bridge = sxp.child_value(config, 'bridge') script = sxp.child_value(config, 'script') ipaddr = self._get_config_ipaddr(config) + xd = get_component('xen.xend.XendDomain') backendDomain = str(xd.domain_lookup(sxp.child_value(config, 'backend', '0')).id) + if (mac is not None) and (mac != self.mac): raise XendError("cannot change mac") if (be_mac is not None) and (be_mac != self.be_mac): @@ -199,13 +149,36 @@ class NetDev(controller.SplitDev): setattr(self, k, v) self.config = sxp.merge(config, self.config) self.vifctl("up") + + self._config_credit_limit(config, change=True) return self.config + def _config_credit_limit(self, config, change=False): + period = sxp.child_value(config, 'period') + credit = sxp.child_value(config, 'credit') + if period and credit: + try: + period = int(period) + credit = int(credit) + except ex: + raise XendError('vif: invalid credit limit') + if change: + self.setCreditLimit(credit, period) + self.config = sxp.merge([sxp.name(self.config), + ['credit', credit], + ['period', period]], + self.config) + else: + self.period = period + self.credit = credit + elif period or credit: + raise XendError('vif: invalid credit limit') + def sxpr(self): vif = str(self.vif) mac = self.get_mac() val = ['vif', - ['idx', self.idx], + ['id', self.id], ['vif', vif], ['mac', mac], ['vifname', self.vifname], @@ -219,12 +192,15 @@ class NetDev(controller.SplitDev): val.append(['script', self.script]) for ip in self.ipaddr: val.append(['ip', ip]) + if self.credit: + val.append(['credit', self.credit]) + if self.period: + val.append(['period', self.period]) if self.evtchn: val.append(['evtchn', self.evtchn['port1'], self.evtchn['port2']]) - if self.index is not None: - val.append(['index', self.index]) + val.append(['index', self.getIndex()]) return val def get_vifname(self): @@ -233,7 +209,7 @@ class NetDev(controller.SplitDev): return self.vifname def default_vifname(self): - return "vif%d.%d" % (self.controller.dom, self.vif) + return "vif%d.%d" % (self.frontendDomain, self.vif) def get_mac(self): """Get the MAC address as a string. @@ -248,7 +224,7 @@ class NetDev(controller.SplitDev): def vifctl_params(self, vmname=None): """Get the parameters to pass to vifctl. """ - dom = self.controller.dom + dom = self.frontendDomain if vmname is None: xd = get_component('xen.xend.XendDomain') try: @@ -278,11 +254,23 @@ class NetDev(controller.SplitDev): if vnet: vnet.vifctl(op, self.get_vifname(), self.get_mac()) - def attach(self): - d = self.send_be_create() - d.addCallback(self.respond_be_create) - return d + def attach(self, recreate=False, change=False): + if recreate: + pass + else: + self.send_be_create() + if self.credit and self.period: + self.send_be_creditlimit(self.credit, self.period) + self.vifctl('up', vmname=self.getDomainName()) + + def closeEvtchn(self): + if self.evtchn: + channel.eventChannelClose(self.evtchn) + self.evtchn = None + def openEvtchn(self): + self.evtchn = channel.eventChannel(self.backendDomain, self.frontendDomain) + def getEventChannelBackend(self): val = 0 if self.evtchn: @@ -296,90 +284,79 @@ class NetDev(controller.SplitDev): return val def send_be_create(self): - d = defer.Deferred() msg = packMsg('netif_be_create_t', - { 'domid' : self.controller.dom, + { 'domid' : self.frontendDomain, 'netif_handle' : self.vif, 'be_mac' : self.be_mac or [0, 0, 0, 0, 0, 0], 'mac' : self.mac, #'vifname' : self.vifname }) - self.getBackendInterface().writeRequest(msg, response=d) - return d - - def respond_be_create(self, msg): - val = unpackMsg('netif_be_create_t', msg) - return self + msg = self.backendChannel.requestResponse(msg) + # todo: check return status - def destroy(self, change=0): + def destroy(self, change=False, reboot=False): """Destroy the device's resources and disconnect from the back-end device controller. If 'change' is true notify the front-end interface. @param change: change flag """ + self.destroyed = True self.status = NETIF_INTERFACE_STATUS_CLOSED - def cb_destroy(val): - self.send_be_destroy() - self.getBackendInterface().close() - if change: - self.reportStatus() - log.debug("Destroying vif domain=%d vif=%d", self.controller.dom, self.vif) - if self.evtchn: - channel.eventChannelClose(self.evtchn) + log.debug("Destroying vif domain=%d vif=%d", self.frontendDomain, self.vif) + self.closeEvtchn() self.vifctl('down') - d = self.send_be_disconnect() - d.addCallback(cb_destroy) + self.send_be_disconnect() + self.send_be_destroy() + if change: + self.reportStatus() def send_be_disconnect(self): - d = defer.Deferred() msg = packMsg('netif_be_disconnect_t', - { 'domid' : self.controller.dom, + { 'domid' : self.frontendDomain, 'netif_handle' : self.vif }) - self.getBackendInterface().writeRequest(msg, response=d) - return d + return self.backendChannel.writeRequest(msg) def send_be_destroy(self, response=None): msg = packMsg('netif_be_destroy_t', - { 'domid' : self.controller.dom, + { 'domid' : self.frontendDomain, 'netif_handle' : self.vif }) - self.controller.delDevice(self.vif) - self.getBackendInterface().writeRequest(msg, response=response) + return self.backendChannel.writeRequest(msg) - def recv_fe_interface_connect(self, val, req): - if not req: return - self.evtchn = channel.eventChannel(self.backendDomain, self.controller.dom) + def recv_fe_interface_connect(self, val): + self.openEvtchn() msg = packMsg('netif_be_connect_t', - { 'domid' : self.controller.dom, + { 'domid' : self.frontendDomain, 'netif_handle' : self.vif, 'evtchn' : self.getEventChannelBackend(), 'tx_shmem_frame' : val['tx_shmem_frame'], 'rx_shmem_frame' : val['rx_shmem_frame'] }) - d = defer.Deferred() - d.addCallback(self.respond_be_connect) - self.getBackendInterface().writeRequest(msg, response=d) - - def respond_be_connect(self, msg): - val = unpackMsg('netif_be_connect_t', msg) - dom = val['domid'] - vif = val['netif_handle'] + msg = self.backendChannel.requestResponse(msg) + #todo: check return status self.status = NETIF_INTERFACE_STATUS_CONNECTED self.reportStatus() + + def setCreditLimit(self, credit, period): + #todo: these params should be in sxpr and vif config. + self.credit = credit + self.period = period + self.send_be_creditlimit(credit, period) + + def getCredit(self): + return self.credit + + def getPeriod(self): + return self.period def send_be_creditlimit(self, credit, period): msg = packMsg('netif_be_creditlimit_t', - { 'domid' : self.controller.dom, + { 'domid' : self.frontendDomain, 'netif_handle' : self.vif, 'credit_bytes' : credit, 'period_usec' : period }) - d = defer.Deferred() - d.addCallback(self.respond_be_creditlimit) - self.getBackendInterface().writeRequest(msg, response=d) - - def respond_be_creditlimit(self, msg): - val = unpackMsg('netif_be_creditlimit_t', msg) - return self + msg = self.backendChannel.requestResponse(msg) + # todo: check return status - def reportStatus(self, resp=0): + def reportStatus(self, resp=False): msg = packMsg('netif_fe_interface_status_t', { 'handle' : self.vif, 'status' : self.status, @@ -387,99 +364,80 @@ class NetDev(controller.SplitDev): 'domid' : self.backendDomain, 'mac' : self.mac }) if resp: - self.controller.writeResponse(msg) + self.frontendChannel.writeResponse(msg) else: - self.controller.writeRequest(msg) + self.frontendChannel.writeRequest(msg) def interfaceChanged(self): - """Notify the font-end that a device has been added or removed. + """Notify the front-end that a device has been added or removed. """ self.reportStatus() -class NetifController(controller.SplitController): +class NetifController(DevController): """Network interface controller. Handles all network devices for a domain. """ - def __init__(self, factory, dom): - controller.SplitController.__init__(self, factory, dom) - self.devices = {} - self.addMethod(CMSG_NETIF_FE, - CMSG_NETIF_FE_DRIVER_STATUS, - self.recv_fe_driver_status) - self.addMethod(CMSG_NETIF_FE, - CMSG_NETIF_FE_INTERFACE_STATUS, - self.recv_fe_interface_status) - self.addMethod(CMSG_NETIF_FE, - CMSG_NETIF_FE_INTERFACE_CONNECT, - self.recv_fe_interface_connect) - self.registerChannel() + def __init__(self, dctype, vm, recreate=False): + DevController.__init__(self, dctype, vm, recreate=recreate) + self.channel = None + self.rcvr = None + self.channel = None + + def initController(self, recreate=False, reboot=False): + self.destroyed = False + self.channel = self.getChannel() + # Register our handlers for incoming requests. + self.rcvr = CtrlMsgRcvr(self.channel) + self.rcvr.addHandler(CMSG_NETIF_FE, + CMSG_NETIF_FE_DRIVER_STATUS, + self.recv_fe_driver_status) + self.rcvr.addHandler(CMSG_NETIF_FE, + CMSG_NETIF_FE_INTERFACE_STATUS, + self.recv_fe_interface_status) + self.rcvr.addHandler(CMSG_NETIF_FE, + CMSG_NETIF_FE_INTERFACE_CONNECT, + self.recv_fe_interface_connect) + self.rcvr.registerChannel() + if reboot: + self.rebootDevices() + + def destroyController(self, reboot=False): + """Destroy the controller and all devices. + """ + self.destroyed = True + log.debug("Destroying netif domain=%d", self.getDomain()) + self.destroyDevices(reboot=reboot) + if self.rcvr: + self.rcvr.deregisterChannel() def sxpr(self): - val = ['netif', ['dom', self.dom]] + val = ['netif', ['dom', self.getDomain()]] return val - def lostChannel(self): - """Method called when the channel has been lost. - """ - controller.Controller.lostChannel(self) + def newDevice(self, id, config, recreate=False): + """Create a network device. - def addDevice(self, vif, config): - """Add a network interface. - - @param vif: device index - @param config: device configuration - @return: device - """ - if vif in self.devices: - raise XendError('device exists:' + str(vif)) - dev = NetDev(vif, self, config) - self.devices[vif] = dev - return dev - - def destroy(self): - """Destroy the controller and all devices. - """ - self.destroyDevices() - - def destroyDevices(self): - """Destroy all devices. - """ - for dev in self.getDevices(): - dev.destroy() - - def attachDevice(self, vif, config, recreate=0): - """Attach a network device. - - @param vif: interface index + @param id: interface id @param config: device configuration @param recreate: recreate flag (true after xend restart) @return: deferred """ - dev = self.addDevice(vif, config) - if recreate: - d = defer.succeed(dev) - else: - d = dev.attach() - return d + return NetDev(self, id, config, recreate=recreate) def limitDevice(self, vif, credit, period): if vif not in self.devices: raise XendError('device does not exist for credit limit: vif' - + str(self.dom) + '.' + str(vif)) + + str(self.getDomain()) + '.' + str(vif)) dev = self.devices[vif] - d = dev.send_be_creditlimit(credit, period) - return d + return dev.setCreditLimit(credit, period) - def recv_fe_driver_status(self, msg, req): - if not req: return - print - print 'recv_fe_driver_status>' + def recv_fe_driver_status(self, msg): msg = packMsg('netif_fe_driver_status_t', { 'status' : NETIF_DRIVER_STATUS_UP, ## FIXME: max_handle should be max active interface id - 'max_handle' : len(self.devices) - #'max_handle' : self.getMaxDeviceIdx() + 'max_handle' : self.getDeviceCount() + #'max_handle' : self.getMaxDeviceId() }) # Two ways of doing it: # 1) front-end requests driver status, we reply with the interface count, @@ -492,43 +450,37 @@ class NetifController(controller.SplitController): # # We really want to use 1), but at the moment the xenU kernel panics # in that mode, so we're sticking to 2) for now. - resp = 0 + resp = False if resp: - self.writeResponse(msg) + self.channel.writeResponse(msg) else: for dev in self.devices.values(): dev.reportStatus() - self.writeRequest(msg) + self.channel.writeRequest(msg) return resp - def recv_fe_interface_status(self, msg, req): - if not req: return - print - val = unpackMsg('netif_fe_interface_status_t', msg) - print "recv_fe_interface_status>", val + def recv_fe_interface_status(self, msg): vif = val['handle'] dev = self.findDevice(vif) if dev: - print 'recv_fe_interface_status>', 'dev=', dev - dev.reportStatus(resp=1) + dev.reportStatus(resp=True) else: + log.error('Received netif_fe_interface_status for unknown vif: dom=%d vif=%d', + self.dom, vif) msg = packMsg('netif_fe_interface_status_t', { 'handle' : -1, 'status' : NETIF_INTERFACE_STATUS_CLOSED, }); - print 'recv_fe_interface_status>', 'no dev, returning -1' - self.writeResponse(msg) - return 1 + self.channel.writeResponse(msg) + return True - - def recv_fe_interface_connect(self, msg, req): + def recv_fe_interface_connect(self, msg): val = unpackMsg('netif_fe_interface_connect_t', msg) vif = val['handle'] - print - print "recv_fe_interface_connect", val dev = self.getDevice(vif) if dev: - dev.recv_fe_interface_connect(val, req) + dev.recv_fe_interface_connect(val) else: log.error('Received netif_fe_interface_connect for unknown vif: dom=%d vif=%d', self.dom, vif) + diff --git a/tools/python/xen/xend/server/pciif.py b/tools/python/xen/xend/server/pciif.py new file mode 100644 index 0000000000..83f061a0b2 --- /dev/null +++ b/tools/python/xen/xend/server/pciif.py @@ -0,0 +1,59 @@ +import types + +import xen.lowlevel.xc; xc = xen.lowlevel.xc.new() + +from xen.xend import sxp +from xen.xend.XendError import VmError + +from controller import Dev, DevController + +def parse_pci(val): + """Parse a pci field. + """ + if isinstance(val, types.StringType): + radix = 10 + if val.startswith('0x') or val.startswith('0X'): + radix = 16 + v = int(val, radix) + else: + v = val + return v + +class PciDev(Dev): + + def __init__(self, controller, id, config, recreate=False): + Dev.__init__(self, controller, id, config, recreate=recreate) + bus = sxp.child_value(self.config, 'bus') + if not bus: + raise VmError('pci: Missing bus') + dev = sxp.child_value(self.config, 'dev') + if not dev: + raise VmError('pci: Missing dev') + func = sxp.child_value(self.config, 'func') + if not func: + raise VmError('pci: Missing func') + try: + bus = parse_pci(bus) + dev = parse_pci(dev) + func = parse_pci(func) + except: + raise VmError('pci: invalid parameter') + + def attach(self, recreate=False, change=False): + rc = xc.physdev_pci_access_modify(dom = self.getDomain(), + bus = bus, + dev = dev, + func = func, + enable = True) + if rc < 0: + #todo non-fatal + raise VmError('pci: Failed to configure device: bus=%s dev=%s func=%s' % + (bus, dev, func)) + + def destroy(self, change=False, reboot=False): + pass + +class PciController(DevController): + + def newDevice(self, id, config, recreate=False): + return PciDev(self, id, config, recreate=recreate) diff --git a/tools/python/xen/xend/server/usbif.py b/tools/python/xen/xend/server/usbif.py index d90997634b..e6d263b820 100644 --- a/tools/python/xen/xend/server/usbif.py +++ b/tools/python/xen/xend/server/usbif.py @@ -4,59 +4,61 @@ """Support for virtual USB hubs. """ -from twisted.internet import defer -#defer.Deferred.debug = 1 - from xen.xend import sxp from xen.xend.XendLogging import log from xen.xend.XendError import XendError import channel -import controller +from controller import Dev, DevController from messages import * -class UsbifBackendController(controller.BackendController): - """ Handler for the 'back-end' channel to a USB hub domain. - Must be connected using connect() before it can be used. - Do not create directly - use getBackend() on the UsbifController. +class UsbBackend: + """Handler for the 'back-end' channel to a USB device driver domain + on behalf of a front-end domain. """ + def __init__(self, controller, id, dom): + self.controller = controller + self.id = id + self.destroyed = False + self.connected = False + self.connecting = False + self.frontendDomain = self.controller.getDomain() + self.backendDomain = dom + self.frontendChannel = None + self.backendChannel = None - def __init__(self, ctrl, dom): - controller.BackendController.__init__(self, ctrl, dom) - self.connected = 0 - self.evtchn = None - self.addMethod(CMSG_USBIF_BE, - CMSG_USBIF_BE_DRIVER_STATUS_CHANGED, - self.recv_be_driver_status_changed) - self.registerChannel() + def init(self, recreate=False, reboot=False): + self.frontendChannel = self.controller.getChannel() + cf = channel.channelFactory() + self.backendChannel = cf.openChannel(self.backendDomain) def __str__(self): - return '' % (self.dom) - - def recv_be_driver_status_changed(self, msg, req): - """Request handler for be_driver_status_changed messages. + return ('' + % (self.frontendDomain, + self.backendDomain, + self.id)) + + def closeEvtchn(self): + if self.evtchn: + channel.eventChannelClose(self.evtchn) + self.evtchn = None + + def openEvtchn(self): + self.evtchn = channel.eventChannel(self.backendDomain, self.frontendDomain) - @param msg: message - @type msg: xu message - @param req: request flag (true if the msg is a request) - @type req: bool - """ - val = unpackMsg('usbif_be_driver_status_changed_t', msg) - status = val['status'] - -class UsbifBackendInterface(controller.BackendInterface): - """Handler for the 'back-end' channel to a network device driver domain - on behalf of a front-end domain. + def getEventChannelBackend(self): + val = 0 + if self.evtchn: + val = self.evtchn['port1'] + return val - Each network device is handled separately, so we add no functionality - here. - """ - def __init__(self, ctrl, dom): - controller.BackendInterface.__init__(self, ctrl, dom, 0) - self.connected = 0 - self.connecting = False + def getEventChannelFrontend(self): + val = 0 + if self.evtchn: + val = self.evtchn['port2'] + return val - def connect(self, recreate=0): + def connect(self, recreate=False): """Connect the controller to the usbif control interface. @param recreate: true if after xend restart @@ -64,86 +66,53 @@ class UsbifBackendInterface(controller.BackendInterface): """ log.debug("Connecting usbif %s", str(self)) if recreate or self.connected or self.connecting: - d = defer.succeed(self) + pass else: - self.connecting = True - d = self.send_be_create() - d.addCallback(self.respond_be_create) - return d + self.send_be_create() def send_be_create(self): - d = defer.Deferred() msg = packMsg('usbif_be_create_t', - { 'domid' : self.controller.dom }) - self.writeRequest(msg, response=d) - return d - - def respond_be_create(self, msg): + { 'domid' : self.frontendDomain }) + msg = self.backendChannel.requestResponse(msg) val = unpackMsg('usbif_be_create_t', msg) log.debug('>UsbifBackendController>respond_be_create> %s', str(val)) self.connected = True - return self - def destroy(self): + def destroy(self, reboot=False): """Disconnect from the usbif control interface and destroy it. """ - def cb_destroy(val): - self.send_be_destroy() - d = defer.Deferred() - d.addCallback(cb_destroy) - self.send_be_disconnect(response=d) + self.destroyed = True + self.send_be_disconnect() + self.send_be_destroy() + self.closeEvtchn() - def send_be_disconnect(self, response=None): + def send_be_disconnect(self): log.debug('>UsbifBackendController>send_be_disconnect> %s', str(self)) msg = packMsg('usbif_be_disconnect_t', - { 'domid' : self.controller.dom }) - self.writeRequest(msg, response=response) + { 'domid' : self.frontendDomain }) + self.backendChannel.writeRequest(msg) def send_be_destroy(self, response=None): log.debug('>UsbifBackendController>send_be_destroy> %s', str(self)) msg = packMsg('usbif_be_destroy_t', - { 'domid' : self.controller.dom }) - self.writeRequest(msg, response=response) - - def send_be_claim_port(self, path): - d=defer.Deferred() - log.debug(">UsbifBackendController>send_be_claim_port> about to claim port %s" % path) - def cb(blah): log.debug(">UsbifBackendController> Claim port completed") - d.addCallback(cb) - msg = packMsg('usbif_be_claim_port_t', - { 'domid' : self.controller.dom, - 'path' : path, - 'usbif_port' : self.controller.devices[path], - 'status' : 0}) - self.writeRequest(msg, response=d) - # No need to add any callbacks, since the guest polls its virtual ports - # anyhow, somewhat like a UHCI controller ;-) - return d + { 'domid' : self.frontendDomain }) + self.backendChannel.writeRequest(msg, response=response) - def send_be_release_port(self, path): - d=defer.Deferred() - def cb(blah): log.debug(">UsbifBackendController> Release port completed") - d.addCallback(cb) - msg = packMsg('usbif_be_release_port_t', - { 'domid' : self.controller.dom, - 'path' : path }) - self.writeRequest(msg, response) - # No need to add any callbacks, since the guest polls its virtual ports - # anyhow, somewhat like a UHCI controller ;-) def connectInterface(self, val): - self.evtchn = channel.eventChannel(0, self.controller.dom) + self.openEvtchn() log.debug(">UsbifBackendController>connectInterface> connecting usbif to event channel %s ports=%d:%d", - str(self), self.evtchn['port1'], self.evtchn['port2']) + str(self), + self.getEventChannelBackend(), + self.getEventChannelFrontend()) msg = packMsg('usbif_be_connect_t', - { 'domid' : self.controller.dom, - 'evtchn' : self.evtchn['port1'], + { 'domid' : self.frontendDomain, + 'evtchn' : self.getEventChannelBackend(), 'shmem_frame' : val['shmem_frame'], 'bandwidth' : 500 # XXX fix bandwidth! }) - d = defer.Deferred() - d.addCallback(self.respond_be_connect) - self.writeRequest(msg, response=d) + msg = self.backendChannel.requestResponse(msg) + self.respond_be_connect(msg) def respond_be_connect(self, msg): """Response handler for a be_connect message. @@ -153,196 +122,201 @@ class UsbifBackendInterface(controller.BackendInterface): """ val = unpackMsg('usbif_be_connect_t', msg) log.debug('>UsbifBackendController>respond_be_connect> %s, %s', str(self), str(val)) - d = defer.Deferred() - def cb(blah): - log.debug(">UsbifBackendController> Successfully connected USB interface for domain %d" % self.controller.dom) - self.controller.claim_ports() - d.addCallback(cb) - self.send_fe_interface_status_changed(d) + self.send_fe_interface_status_changed() + log.debug(">UsbifBackendController> Successfully connected USB interface for domain %d" % self.frontendDomain) + self.controller.claim_ports() - def send_fe_interface_status_changed(self, response=None): + def send_fe_interface_status_changed(self): msg = packMsg('usbif_fe_interface_status_changed_t', - { 'status' : USBIF_INTERFACE_STATUS_CONNECTED, - 'domid' : 0, ## FIXME: should be domid of backend - 'evtchn' : self.evtchn['port2'], + { 'status' : USBIF_INTERFACE_STATUS_CONNECTED, + 'domid' : self.backendDomain, + 'evtchn' : self.getEventChannelFrontend(), 'bandwidth' : 500, - 'num_ports' : len(self.controller.devices.keys())}) - self.controller.writeRequest(msg, response=response) + 'num_ports' : len(self.controller.devices) + }) + self.frontendChannel.writeRequest(msg) + + def interfaceChanged(self): + self.send_fe_interface_status_changed() + +class UsbDev(Dev): + + def __init__(self, controller, id, config, recreate=False): + Dev.__init__(self, controller, id, config, recreate=recreate) + self.port = id + self.path = None + self.frontendDomain = self.getDomain() + self.frontendChannel = None + self.backendDomain = 0 + self.backendChannel = None + self.configure(self.config, recreate=recreate) + + def init(self, recreate=False, reboot=False): + self.destroyed = False + self.frontendDomain = self.getDomain() + self.frontendChannel = self.getChannel() + backend = self.getBackend() + self.backendChannel = backend.backendChannel -class UsbifControllerFactory(controller.SplitControllerFactory): - """Factory for creating USB interface controllers. - """ + def configure(self, config, change=False, recreate=False): + if change: + raise XendError("cannot reconfigure usb") + #todo: FIXME: Use sxp access methods to get this value. + # Must not use direct indexing. + self.path = config[1][1] + + #todo: FIXME: Support configuring the backend domain. +## try: +## self.backendDomain = int(sxp.child_value(config, 'backend', '0')) +## except: +## raise XendError('invalid backend domain') + + def attach(self, recreate=False, change=False): + if recreate: + pass + else: + self.attachBackend() + if change: + self.interfaceChanged() + + def sxpr(self): + val = ['usb', + ['id', self.id], + ['port', self.port], + ['path', self.path], + ] + val.append(['index', self.getIndex()]) + return val - def __init__(self): - controller.ControllerFactory.__init__(self) - self.backendControllers = {} + def getBackend(self): + return self.controller.getBackend(self.backendDomain) - def createController(self, dom, recreate=0): - """Create a USB device controller for a domain. + def destroy(self, change=False, reboot=False): + """Destroy the device. If 'change' is true notify the front-end interface. - @param dom: domain - @type dom: int - @param recreate: if true it's a recreate (after xend restart) - @type recreate: bool - @return: block device controller - @rtype: UsbifController - """ - usbif = self.getControllerByDom(dom) - if usbif is None: - usbif = UsbifController(self, dom) - self.addController(usbif) - return usbif - - def getDomainDevices(self, dom): - """Get the block devices for a domain. - - @param dom: domain - @type dom: int - @return: devices - @rtype: [device] + @param change: change flag """ - usbif = self.getControllerByDom(dom) - return (usbif and usbif.getDevices()) or [] - - def getDomainDevice(self, dom, vdev): - """Get a block device from a domain. - - @param dom: domain - @type dom: int - @param vdev: device index - @type vdev: int - @return: device - @rtype: device - """ - usbif = self.getControllerByDom(dom) - return (usbif and usbif.getDevice(vdev)) or None - - def createBackendInterface(self, ctrl, dom, handle): - """Create a network device backend interface. - - @param ctrl: controller - @param dom: backend domain - @param handle: interface handle - @return: backend interface + self.destroyed = True + log.debug("Destroying usb domain=%d id=%s", self.frontendDomain, self.id) + self.send_be_release_port() + if change: + self.interfaceChanged() + + def interfaceChanged(self): + """Tell the back-end to notify the front-end that a device has been + added or removed. """ - return UsbifBackendInterface(ctrl, dom) + self.getBackend().interfaceChanged() - def getBackendController(self, dom): - """Get the backend controller for a domain, creating - if necessary. + def attachBackend(self): + """Attach the device to its controller. - @param dom: backend domain - @return: backend controller """ - b = self.getBackendControllerByDomain(dom) - if b is None: - b = self.createBackendController(dom) - self.backendControllers[b.dom] = b - return b + self.getBackend().connect() + + def send_be_claim_port(self): + log.debug(">UsbifBackendController>send_be_claim_port> about to claim port %s" % self.path) + msg = packMsg('usbif_be_claim_port_t', + { 'domid' : self.frontendDomain, + 'path' : self.path, + 'usbif_port' : self.port, + 'status' : 0}) + self.backendChannel.writeRequest(msg) + log.debug(">UsbifBackendController> Claim port completed") + # No need to add any callbacks, since the guest polls its virtual ports + # anyhow, somewhat like a UHCI controller ;-) - def createBackendController(self, dom): - return UsbifBackendController(self, dom) + def send_be_release_port(self): + msg = packMsg('usbif_be_release_port_t', + { 'domid' : self.frontendDomain, + 'path' : self.path }) + self.backendChannel.writeRequest(msg) + log.debug(">UsbifBackendController> Release port completed") + # No need to add any callbacks, since the guest polls its virtual ports + # anyhow, somewhat like a UHCI controller ;-) -class UsbifController(controller.SplitController): +class UsbifController(DevController): """USB device interface controller. Handles all USB devices for a domain. """ - def __init__(self, factory, dom): + def __init__(self, dctype, vm, recreate=False): """Create a USB device controller. - Do not call directly - use createController() on the factory instead. """ - controller.SplitController.__init__(self, factory, dom) - self.num_ports = 0 - self.devices = {} - self.addMethod(CMSG_USBIF_FE, - CMSG_USBIF_FE_DRIVER_STATUS_CHANGED, - self.recv_fe_driver_status_changed) - self.addMethod(CMSG_USBIF_FE, - CMSG_USBIF_FE_INTERFACE_CONNECT, - self.recv_fe_interface_connect) - self.registerChannel() - try: - self.backendDomain = 0 #int(sxp.child_value(config, 'backend', '0')) TODO: configurable backends - except: - raise XendError('invalid backend domain') - + DevController.__init__(self, dctype, vm, recreate=recreate) + self.backends = {} + self.backendId = 0 + self.rcvr = None + + def init(self, recreate=False, reboot=False): + self.destroyed = False + self.rcvr = CtrlMsgRcvr(self.getChannel()) + self.rcvr.addHandler(CMSG_USBIF_FE, + CMSG_USBIF_FE_DRIVER_STATUS_CHANGED, + self.recv_fe_driver_status_changed) + self.rcvr.addHandler(CMSG_USBIF_FE, + CMSG_USBIF_FE_INTERFACE_CONNECT, + self.recv_fe_interface_connect) + self.rcvr.registerChannel() + if reboot: + self.rebootBackends() + self.rebootDevices() def sxpr(self): - val = ['usbif', ['dom', self.dom]] + val = ['usbif', + ['dom', self.getDomain()]] return val - def createBackend(self, dom, handle): - return UsbifBackendController(self, dom, handle) + def newDevice(self, id, config, recreate=False): + return UsbDev(self, id, config, recreate=recreate) - def getDevices(self): - return self.devices.values() - - def attachDevice(self, path, recreate=0): - """Add privileges for a particular device to the domain. - @param path: the Linux-style path to the device port - """ - self.devices[path[1][1]] = self.num_ports - self.num_ports += 1 - log.debug(">UsbifController>attachDevice> device: %s, port: %d" % - (str(path), self.num_ports ) ) - - backend =self.getBackendInterface(self.backendDomain) - - def cb(blah): - log.debug(">UsbifController> Backend created") - pass - d = backend.connect() - d.addCallback(cb) # Chaining the claim port operation - return d - - - def removeDevice(self, path): - self.delDevice(path) - backend = self.getBackendInterface(self.backendDomain) - return backend.send_be_release_port(path) - - def delDevice(self, path): - if path in self.devices: - del self.devices[path] - - def attachPort(self, path, recreate=0): - """Attach a device to the specified interface. - On success the returned deferred will be called with the device. - - @return: deferred - @rtype: Deferred - """ - return self.attachDevice(path) - - def destroy(self): + def destroyController(self, reboot=False): """Destroy the controller and all devices. """ - log.debug("Destroying usbif domain=%d", self.dom) - self.destroyBackends() - - def destroyDevices(self): - """Destroy all devices. - """ - for path in self.getDevices(): - self.removeDevice(path) - - def destroyBackends(self): - for backend in self.getBackendInterfaces(): - backend.destroy() + self.destroyed = True + log.debug("Destroying blkif domain=%d", self.getDomain()) + self.destroyDevices(reboot=reboot) + self.destroyBackends(reboot=reboot) + if self.rcvr: + self.rcvr.deregisterChannel() + + def rebootBackends(self): + for backend in self.backends.values(): + backend.init(reboot=True) + + def getBackendById(self, id): + return self.backends.get(id) + + def getBackendByDomain(self, dom): + for backend in self.backends.values(): + if backend.backendDomain == dom: + return backend + return None + + def getBackend(self, dom): + backend = self.getBackendByDomain(dom) + if backend: return backend + backend = UsbBackend(self, self.backendId, dom) + self.backendId += 1 + self.backends[backend.getId()] = backend + backend.init() + return backend + + def destroyBackends(self, reboot=False): + for backend in self.backends.values(): + backend.destroy(reboot=reboot) - def recv_fe_driver_status_changed(self, msg, req): + def recv_fe_driver_status_changed(self, msg): val = unpackMsg('usbif_fe_driver_status_changed_t', msg) log.debug('>UsbifController>recv_fe_driver_status_changed> %s', str(val)) - # For each backend? + #todo: FIXME: For each backend? msg = packMsg('usbif_fe_interface_status_changed_t', { 'status' : USBIF_INTERFACE_STATUS_DISCONNECTED, - 'domid' : 0, ## FIXME: should be domid of backend + 'domid' : 0, #todo: FIXME: should be domid of backend 'evtchn' : 0 }) - d = defer.Deferred() - d.addCallback(self.disconnected_resp) - self.writeRequest(msg) + msg = self.getChannel().requestResponse(msg) + self.disconnected_resp(msg) def disconnected_resp(self, msg): val = unpackMsg('usbif_fe_interface_status_changed_t', msg) @@ -351,18 +325,21 @@ class UsbifController(controller.SplitController): else: log.debug(">UsbifController>disconnected_resp> interface disconnected OK") - def recv_fe_interface_connect(self, msg, req): + def recv_fe_interface_connect(self, msg): val = unpackMsg('usbif_fe_interface_status_changed_t', msg) log.debug(">UsbifController>recv_fe_interface_connect> notifying backend") - backend = self.getBackendInterfaceByHandle(0) + #todo: FIXME: generalise to more than one backend. + id = 0 + backend = self.getBackendById(id) if backend: - d = backend.connectInterface(val) + try: + backend.connectInterface(val) + except IOError, ex: + log.error("Exception connecting backend: %s", ex) else: - log.error('>UsbifController>recv_fe_interface_connect> unknown interface') + log.error('interface connect on unknown interface: id=%d', id) def claim_ports(self): - backend = self.getBackendInterfaceByHandle(0) - for path in self.devices.keys(): - log.debug(">UsbifController>claim_ports> claiming port... %s" % path) - backend.send_be_claim_port(path) + for dev in self.devices.values(): + dev.send_be_claim_port() -- 2.30.2